From 63a13467422515c4e5bfd96a621ec2ad3c29dd35 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 27 Mar 2025 14:50:34 +0800 Subject: [PATCH 1/7] [GLUTEN-9152][CORE] Avoid unnecessary serialization of hadoop conf --- .../gluten/backendsapi/clickhouse/CHBackend.scala | 3 +-- .../gluten/backendsapi/velox/VeloxBackend.scala | 10 +++------- .../apache/gluten/utils/ParquetMetadataUtils.scala | 14 +++++--------- .../gluten/backendsapi/BackendSettingsApi.scala | 5 +++-- .../execution/BasicScanExecTransformer.scala | 6 +----- .../gluten/execution/WholeStageTransformer.scala | 8 ++------ 6 files changed, 15 insertions(+), 31 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 728c5c9a76b2..78850e56b378 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -44,7 +44,6 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration import java.util.Locale @@ -175,7 +174,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { fields: Array[StructField], rootPaths: Seq[String], properties: Map[String, String], - serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = { + hadoopConf: Configuration): ValidationResult = { // Validate if all types are supported. def hasComplexType: Boolean = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 23fec4bbdc20..b194e2183e67 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -48,8 +48,8 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import scala.util.control.Breaks.breakable @@ -103,7 +103,7 @@ object VeloxBackendSettings extends BackendSettingsApi { fields: Array[StructField], rootPaths: Seq[String], properties: Map[String, String], - serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = { + hadoopConf: Configuration): ValidationResult = { def validateScheme(): Option[String] = { val filteredRootPaths = distinctRootPaths(rootPaths) @@ -187,11 +187,7 @@ object VeloxBackendSettings extends BackendSettingsApi { val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit val encryptionResult = - ParquetMetadataUtils.validateEncryption( - format, - rootPaths, - serializableHadoopConf, - fileLimit) + ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf, fileLimit) if (encryptionResult.ok()) { None } else { diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index 48d0629268da..eb917a4772d2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -21,8 +21,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat -import org.apache.spark.util.SerializableConfiguration - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator} @@ -38,29 +36,27 @@ object ParquetMetadataUtils { * File format, e.g., `ParquetReadFormat` * @param rootPaths * List of file paths to scan - * @param serializableHadoopConf - * Optional Hadoop configuration + * @param hadoopConf + * Hadoop configuration * @return * [[ValidationResult]] validation success or failure */ def validateEncryption( format: ReadFileFormat, rootPaths: Seq[String], - serializableHadoopConf: Option[SerializableConfiguration], + hadoopConf: Configuration, fileLimit: Int ): ValidationResult = { if (format != ParquetReadFormat || rootPaths.isEmpty) { return ValidationResult.succeeded } - val conf = serializableHadoopConf.map(_.value).getOrElse(new Configuration()) - rootPaths.foreach { rootPath => - val fs = new Path(rootPath).getFileSystem(conf) + val fs = new Path(rootPath).getFileSystem(hadoopConf) try { val encryptionDetected = - checkForEncryptionWithLimit(fs, new Path(rootPath), conf, fileLimit = fileLimit) + checkForEncryptionWithLimit(fs, new Path(rootPath), hadoopConf, fileLimit = fileLimit) if (encryptionDetected) { return ValidationResult.failed("Encrypted Parquet file detected.") } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 937d503c1fe5..c9a3fdbc6dd4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.types.StructField -import org.apache.spark.util.SerializableConfiguration + +import org.apache.hadoop.conf.Configuration trait BackendSettingsApi { @@ -41,7 +42,7 @@ trait BackendSettingsApi { fields: Array[StructField], rootPaths: Seq[String], properties: Map[String, String], - serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = + hadoopConf: Configuration): ValidationResult = ValidationResult.succeeded def getSubstraitReadFileFormatV1(fileFormat: FileFormat): LocalFilesNode.ReadFileFormat diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index b5d3c03fc6ef..056c35a527cb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration import com.google.protobuf.StringValue import io.substrait.proto.NamedStruct @@ -78,9 +77,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource getProperties)) } - val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) - override protected def doValidateInternal(): ValidationResult = { var fields = schema.fields @@ -100,7 +96,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource fields, getRootFilePaths, getProperties, - Some(serializableHadoopConf)) + sparkContext.hadoopConfiguration) if (!validationResult.ok()) { return validationResult } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index b1b0f3ddfcd0..df2911042662 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils @@ -236,9 +235,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val sparkConf: SparkConf = sparkContext.getConf - val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) - val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo @transient @@ -400,7 +396,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.value) + sparkContext.hadoopConfiguration) splitInfo.setPaths(newPaths.asJava) } } @@ -463,7 +459,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.value) + sparkContext.hadoopConfiguration) splitInfo.setPaths(newPaths.asJava) } } From ed3035a81f986985c541ae3781a99514bcd8825b Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 27 Mar 2025 15:51:10 +0800 Subject: [PATCH 2/7] fix --- .../org/apache/gluten/backendsapi/clickhouse/CHBackend.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 78850e56b378..b136f372c811 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -45,6 +45,8 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.hadoop.conf.Configuration + import java.util.Locale import scala.util.control.Breaks.{break, breakable} From 91475327d6f13b80aa825b3c4ad6766ab33e35fa Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 1 Apr 2025 17:41:48 +0800 Subject: [PATCH 3/7] temporary solution for oom --- .../apache/gluten/execution/WholeStageTransformer.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index df2911042662..77ca98975ca8 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils @@ -235,6 +236,9 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val sparkConf: SparkConf = sparkContext.getConf + lazy val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( + sparkContext.hadoopConfiguration) + val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo @transient @@ -396,7 +400,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - sparkContext.hadoopConfiguration) + serializableHadoopConf.value) splitInfo.setPaths(newPaths.asJava) } } @@ -459,7 +463,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - sparkContext.hadoopConfiguration) + serializableHadoopConf.value) splitInfo.setPaths(newPaths.asJava) } } From fab6b38483f2232f170ddeb32a80bc7b0b020dcd Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 1 Apr 2025 18:26:55 +0800 Subject: [PATCH 4/7] fix --- .../org/apache/gluten/execution/WholeStageTransformer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 77ca98975ca8..8a6de2f8809e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -236,6 +236,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val sparkConf: SparkConf = sparkContext.getConf + @transient lazy val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( sparkContext.hadoopConfiguration) From cfc64a042caecf9f655b9be10e79a572bd6965ba Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 1 Apr 2025 21:53:47 +0800 Subject: [PATCH 5/7] fix --- .../org/apache/gluten/execution/WholeStageTransformer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 8a6de2f8809e..77ca98975ca8 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -236,7 +236,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val sparkConf: SparkConf = sparkContext.getConf - @transient lazy val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( sparkContext.hadoopConfiguration) From 41904be792e9a4e7f26cc63c1991c7fbd8fe70ca Mon Sep 17 00:00:00 2001 From: zml1206 Date: Wed, 2 Apr 2025 14:10:12 +0800 Subject: [PATCH 6/7] fix --- .../execution/WholeStageTransformer.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 77ca98975ca8..a945ee8cc3e2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -236,8 +236,14 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val sparkConf: SparkConf = sparkContext.getConf - lazy val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) + // Temporary solution to avoid unnecessary serialization of hadoop conf, using @transient will + // cause OOM failure for unknown reasons in GlutenHashExpressionsSuite of Spark 3.5. + val serializableHadoopConf: Option[SerializableConfiguration] = + if (GlutenConfig.get.enableHdfsViewfs) { + Some(new SerializableConfiguration(sparkContext.hadoopConfiguration)) + } else { + None + } val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo @@ -391,7 +397,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val allInputPartitions = leafTransformers.map(_.getPartitions) val allSplitInfos = getSplitInfosFromPartitions(leafTransformers) - if (GlutenConfig.get.enableHdfsViewfs) { + if (serializableHadoopConf.nonEmpty) { val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty allSplitInfos.foreach { splitInfos => @@ -400,7 +406,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.value) + serializableHadoopConf.get.value) splitInfo.setPaths(newPaths.asJava) } } @@ -456,14 +462,14 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f throw new GlutenException( "The partition length of all the leaf transformer are not the same.") } - if (GlutenConfig.get.enableHdfsViewfs) { + if (serializableHadoopConf.nonEmpty) { val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty allSplitInfos.foreach { case splitInfo: LocalFilesNode => val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.value) + serializableHadoopConf.get.value) splitInfo.setPaths(newPaths.asJava) } } From d6c6d0cb17f5c0828550373976c2bb59c84d7178 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Tue, 8 Apr 2025 12:01:05 +0800 Subject: [PATCH 7/7] fix --- .../execution/WholeStageTransformer.scala | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index a945ee8cc3e2..df2911042662 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils @@ -236,15 +235,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val sparkConf: SparkConf = sparkContext.getConf - // Temporary solution to avoid unnecessary serialization of hadoop conf, using @transient will - // cause OOM failure for unknown reasons in GlutenHashExpressionsSuite of Spark 3.5. - val serializableHadoopConf: Option[SerializableConfiguration] = - if (GlutenConfig.get.enableHdfsViewfs) { - Some(new SerializableConfiguration(sparkContext.hadoopConfiguration)) - } else { - None - } - val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo @transient @@ -397,7 +387,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val allInputPartitions = leafTransformers.map(_.getPartitions) val allSplitInfos = getSplitInfosFromPartitions(leafTransformers) - if (serializableHadoopConf.nonEmpty) { + if (GlutenConfig.get.enableHdfsViewfs) { val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty allSplitInfos.foreach { splitInfos => @@ -406,7 +396,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.get.value) + sparkContext.hadoopConfiguration) splitInfo.setPaths(newPaths.asJava) } } @@ -462,14 +452,14 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f throw new GlutenException( "The partition length of all the leaf transformer are not the same.") } - if (serializableHadoopConf.nonEmpty) { + if (GlutenConfig.get.enableHdfsViewfs) { val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty allSplitInfos.foreach { case splitInfo: LocalFilesNode => val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.get.value) + sparkContext.hadoopConfiguration) splitInfo.setPaths(newPaths.asJava) } }