From c4b8251380d3fb66233839f36e5efcabafa4a8a4 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Tue, 18 Oct 2016 15:39:36 -0700 Subject: [PATCH 1/7] [SPARK-17993][SQL] Perform Parquet log output redirection when constructing an instance of `ParquetFileFormat`. Before, it was occurring as part of the call to `inferSchema` and in `prepareWrite`, however not all Parquet access occurs through one of those methods. We add this redirection to the constructor to ensure that any instantiation of `ParquetFileFormat` triggers log redirection if it hasn't already happened --- .../datasources/parquet/ParquetFileFormat.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b8ea7f40c4ab3..be9832c007a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -56,6 +56,7 @@ class ParquetFileFormat with DataSourceRegister with Logging with Serializable { + ParquetFileFormat.redirectParquetLogs override def shortName(): String = "parquet" @@ -129,8 +130,6 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } - ParquetFileFormat.redirectParquetLogs() - new OutputWriterFactory { override def newInstance( path: String, @@ -682,7 +681,7 @@ object ParquetFileFormat extends Logging { // Parquet initializes its own JUL logger in a static block which always prints to stdout. Here // we redirect the JUL logger via SLF4J JUL bridge handler. - val redirectParquetLogsViaSLF4J: Unit = { + private val redirectParquetLogsViaSLF4J: Unit = { def redirect(logger: JLogger): Unit = { logger.getHandlers.foreach(logger.removeHandler) logger.setUseParentHandlers(false) @@ -710,7 +709,8 @@ object ParquetFileFormat extends Logging { } /** - * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`. + * The ParquetFileFormat constructor calls this function to initialize + * `redirectParquetLogsViaSLF4J`. */ - def redirectParquetLogs(): Unit = {} + private def redirectParquetLogs(): Unit = {} } From ad4ce9e7999af86f2904180200ec35186b794bf3 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Thu, 20 Oct 2016 11:48:09 -0700 Subject: [PATCH 2/7] Improve code documentation to clarify the reasoning behind this strategy --- .../datasources/parquet/ParquetFileFormat.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index be9832c007a1b..0ad3414dec66d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -56,7 +56,13 @@ class ParquetFileFormat with DataSourceRegister with Logging with Serializable { - ParquetFileFormat.redirectParquetLogs + // Poor man's "static initializer". Scala doesn't have language support for static initializers, + // and it's important that we initialize `ParquetFileFormat.redirectParquetLogsViaSLF4J` before + // doing anything with the Parquet libraries. Rather than expect clients to initialize the + // `ParquetFileFormat` singleton object at the right time, we put that initialization in the + // constructor of this class. This method is idempotent, and essentially a no-op after its first + // call. + ParquetFileFormat.ensureParquetLogRedirection override def shortName(): String = "parquet" @@ -709,8 +715,9 @@ object ParquetFileFormat extends Logging { } /** - * The ParquetFileFormat constructor calls this function to initialize - * `redirectParquetLogsViaSLF4J`. + * The `ParquetFileFormat` constructor calls this method to ensure that Parquet library log + * output is redirected through the SLF4J JUL bridge handler. This method is a no-op because we + * only require that `redirectParquetLogsViaSLF4J` is initialized. This accomplishes that task. */ - private def redirectParquetLogs(): Unit = {} + private def ensureParquetLogRedirection(): Unit = {} } From 42c44b0391555bcbc0a4aabee5ebf9caf0a4014b Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Tue, 25 Oct 2016 17:17:59 -0700 Subject: [PATCH 3/7] Set parquet library logging threshold to ERROR for testing --- sql/core/src/test/resources/log4j.properties | 4 ++-- sql/hive/src/test/resources/log4j.properties | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties index 33b9ecf1e2826..25b817382195a 100644 --- a/sql/core/src/test/resources/log4j.properties +++ b/sql/core/src/test/resources/log4j.properties @@ -53,5 +53,5 @@ log4j.additivity.hive.ql.metadata.Hive=false log4j.logger.hive.ql.metadata.Hive=OFF # Parquet related logging -log4j.logger.org.apache.parquet.hadoop=WARN -log4j.logger.org.apache.spark.sql.parquet=INFO +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties index fea3404769d9d..072bb25d30a87 100644 --- a/sql/hive/src/test/resources/log4j.properties +++ b/sql/hive/src/test/resources/log4j.properties @@ -59,3 +59,7 @@ log4j.logger.hive.ql.metadata.Hive=OFF log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR + +# Parquet related logging +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR From 07ba0be8b671fa55d2e7259857946c1bd1abb13b Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Thu, 27 Oct 2016 09:53:51 -0700 Subject: [PATCH 4/7] Set def ensureParquetLogRedirection(): Unit = redirectParquetLogsViaSLF4J --- .../execution/datasources/parquet/ParquetFileFormat.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 0ad3414dec66d..07838d04fddde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -716,8 +716,7 @@ object ParquetFileFormat extends Logging { /** * The `ParquetFileFormat` constructor calls this method to ensure that Parquet library log - * output is redirected through the SLF4J JUL bridge handler. This method is a no-op because we - * only require that `redirectParquetLogsViaSLF4J` is initialized. This accomplishes that task. + * output is redirected through the SLF4J JUL bridge handler. */ - private def ensureParquetLogRedirection(): Unit = {} + private def ensureParquetLogRedirection(): Unit = redirectParquetLogsViaSLF4J } From 696ebc0eb2f763c9245dc98903a62158d1b4d9d8 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Sun, 30 Oct 2016 10:13:59 -0700 Subject: [PATCH 5/7] Call ParquetFileFormat.ensureParquetLogRedirection when deserializing ParquetFileFormat --- .../datasources/parquet/ParquetFileFormat.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 07838d04fddde..01ed9be165fa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.ObjectInputStream import java.net.URI import java.util.logging.{Logger => JLogger} @@ -64,6 +65,14 @@ class ParquetFileFormat // call. ParquetFileFormat.ensureParquetLogRedirection + // Java serialization will not call the default constructor. Make sure we call + // ParquetFileFormat.ensureParquetLogRedirection in deserialization by implementing this hook + // method. + private def readObject(in: ObjectInputStream): Unit = { + in.defaultReadObject + ParquetFileFormat.ensureParquetLogRedirection + } + override def shortName(): String = "parquet" override def toString: String = "ParquetFormat" From b6f7e20bf73d4354c38ba222b9b32fd9750a6fdd Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Sun, 30 Oct 2016 11:36:23 -0700 Subject: [PATCH 6/7] Call ParquetFileFormat.ensureParquetLogRedirection when deserializing Parquet output writer factories --- .../datasources/parquet/ParquetFileFormat.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 01ed9be165fa6..3cc692affc8e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -146,7 +146,14 @@ class ParquetFileFormat } new OutputWriterFactory { - override def newInstance( + // OutputWriterFactory is deserialized in the write path on the executor side before any + // output is actually written. Redirect Parquet logs at this time. + private def readObject(in: ObjectInputStream): Unit = { + in.defaultReadObject + ParquetFileFormat.ensureParquetLogRedirection + } + + override def newInstance( path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { From 247ef91dc444bcba0751e9a7d76938c2bea07097 Mon Sep 17 00:00:00 2001 From: Michael Allman Date: Wed, 9 Nov 2016 13:19:51 -0800 Subject: [PATCH 7/7] Move Parquet log redirection code to a serializable Java class ParquetLogRedirector which performs the redirection as part of its class initialization --- .../parquet/ParquetLogRedirector.java | 72 ++++++++++++++++++ .../parquet/ParquetFileFormat.scala | 76 +++---------------- 2 files changed, 82 insertions(+), 66 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java new file mode 100644 index 0000000000000..7a7f32ee1e87b --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import java.io.Serializable; +import java.util.logging.Handler; +import java.util.logging.Logger; + +import org.apache.parquet.Log; +import org.slf4j.bridge.SLF4JBridgeHandler; + +// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using +// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly +final class ParquetLogRedirector implements Serializable { + // Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is + // especially important for Serializable classes where fields are set but constructors are + // ignored + static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector(); + + // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC. + // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep + // references to loggers in both parquet-mr <= 1.6 and 1.7/1.8 + private static final Logger apacheParquetLogger = + Logger.getLogger(Log.class.getPackage().getName()); + private static final Logger parquetLogger = Logger.getLogger("parquet"); + + static { + // For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace. + try { + Class.forName(Log.class.getName()); + redirect(Logger.getLogger(Log.class.getPackage().getName())); + } catch (ClassNotFoundException ex) { + throw new RuntimeException(ex); + } + + // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet` + // namespace. + try { + Class.forName("parquet.Log"); + redirect(Logger.getLogger("parquet")); + } catch (Throwable t) { + // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly + // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block + // should be removed after this issue is fixed. + } + } + + private ParquetLogRedirector() { + } + + private static void redirect(Logger logger) { + for (Handler handler : logger.getHandlers()) { + logger.removeHandler(handler); + } + logger.setUseParentHandlers(false); + logger.addHandler(new SLF4JBridgeHandler()); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 3cc692affc8e3..031a0fe57893f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.io.ObjectInputStream import java.net.URI -import java.util.logging.{Logger => JLogger} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -30,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.parquet.{Log => ApacheParquetLog} import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.schema.MessageType -import org.slf4j.bridge.SLF4JBridgeHandler import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging @@ -57,21 +53,11 @@ class ParquetFileFormat with DataSourceRegister with Logging with Serializable { - // Poor man's "static initializer". Scala doesn't have language support for static initializers, - // and it's important that we initialize `ParquetFileFormat.redirectParquetLogsViaSLF4J` before - // doing anything with the Parquet libraries. Rather than expect clients to initialize the - // `ParquetFileFormat` singleton object at the right time, we put that initialization in the - // constructor of this class. This method is idempotent, and essentially a no-op after its first - // call. - ParquetFileFormat.ensureParquetLogRedirection - - // Java serialization will not call the default constructor. Make sure we call - // ParquetFileFormat.ensureParquetLogRedirection in deserialization by implementing this hook - // method. - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject - ParquetFileFormat.ensureParquetLogRedirection - } + // Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This + // ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat + // is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field + // here. + private val parquetLogRedirector = ParquetLogRedirector.INSTANCE override def shortName(): String = "parquet" @@ -146,12 +132,11 @@ class ParquetFileFormat } new OutputWriterFactory { - // OutputWriterFactory is deserialized in the write path on the executor side before any - // output is actually written. Redirect Parquet logs at this time. - private def readObject(in: ObjectInputStream): Unit = { - in.defaultReadObject - ParquetFileFormat.ensureParquetLogRedirection - } + // This OutputWriterFactory instance is deserialized when writing Parquet files on the + // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold + // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is + // initialized. + private val parquetLogRedirector = ParquetLogRedirector.INSTANCE override def newInstance( path: String, @@ -694,45 +679,4 @@ object ParquetFileFormat extends Logging { Failure(cause) }.toOption } - - // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC. - // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep - // references to loggers in both parquet-mr <= 1.6 and >= 1.7 - val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName) - val parquetLogger: JLogger = JLogger.getLogger("parquet") - - // Parquet initializes its own JUL logger in a static block which always prints to stdout. Here - // we redirect the JUL logger via SLF4J JUL bridge handler. - private val redirectParquetLogsViaSLF4J: Unit = { - def redirect(logger: JLogger): Unit = { - logger.getHandlers.foreach(logger.removeHandler) - logger.setUseParentHandlers(false) - logger.addHandler(new SLF4JBridgeHandler) - } - - // For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace. - // scalastyle:off classforname - Class.forName(classOf[ApacheParquetLog].getName) - // scalastyle:on classforname - redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)) - - // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet` - // namespace. - try { - // scalastyle:off classforname - Class.forName("parquet.Log") - // scalastyle:on classforname - redirect(JLogger.getLogger("parquet")) - } catch { case _: Throwable => - // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly - // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block - // should be removed after this issue is fixed. - } - } - - /** - * The `ParquetFileFormat` constructor calls this method to ensure that Parquet library log - * output is redirected through the SLF4J JUL bridge handler. - */ - private def ensureParquetLogRedirection(): Unit = redirectParquetLogsViaSLF4J }