Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.conf.Configuration

import java.util.Locale

Expand Down Expand Up @@ -177,7 +178,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = {
hadoopConf: Configuration): ValidationResult = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialize code is added for viewfs support as we need to pass some info to Velox. Not sure if the new code can be ser/de-ser and passed down.
The code itself is correct, but there is no viewfs related tests in GHA now so we may need to check this manually

Cc @wangyum @JkSelf

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But from the code logic, I did not find that velox uses the hadoop conf here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum Could you please help in verifying this PR in your ViewFS environment? My ViewFS test setup is currently not functioning properly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. will test it soon.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this patch on our cluster and it still works.


// Validate if all types are supported.
def hasComplexType: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import scala.util.control.Breaks.breakable
Expand Down Expand Up @@ -103,7 +103,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = {
hadoopConf: Configuration): ValidationResult = {

def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
Expand Down Expand Up @@ -187,11 +187,7 @@ object VeloxBackendSettings extends BackendSettingsApi {

val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit
val encryptionResult =
ParquetMetadataUtils.validateEncryption(
format,
rootPaths,
serializableHadoopConf,
fileLimit)
ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf, fileLimit)
if (encryptionResult.ok()) {
None
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat

import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}

Expand All @@ -38,29 +36,27 @@ object ParquetMetadataUtils {
* File format, e.g., `ParquetReadFormat`
* @param rootPaths
* List of file paths to scan
* @param serializableHadoopConf
* Optional Hadoop configuration
* @param hadoopConf
* Hadoop configuration
* @return
* [[ValidationResult]] validation success or failure
*/
def validateEncryption(
format: ReadFileFormat,
rootPaths: Seq[String],
serializableHadoopConf: Option[SerializableConfiguration],
hadoopConf: Configuration,
fileLimit: Int
): ValidationResult = {
if (format != ParquetReadFormat || rootPaths.isEmpty) {
return ValidationResult.succeeded
}

val conf = serializableHadoopConf.map(_.value).getOrElse(new Configuration())

rootPaths.foreach {
rootPath =>
val fs = new Path(rootPath).getFileSystem(conf)
val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
val encryptionDetected =
checkForEncryptionWithLimit(fs, new Path(rootPath), conf, fileLimit = fileLimit)
checkForEncryptionWithLimit(fs, new Path(rootPath), hadoopConf, fileLimit = fileLimit)
if (encryptionDetected) {
return ValidationResult.failed("Encrypted Parquet file detected.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.conf.Configuration

trait BackendSettingsApi {

Expand All @@ -41,7 +42,7 @@ trait BackendSettingsApi {
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String],
serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult =
hadoopConf: Configuration): ValidationResult =
ValidationResult.succeeded

def getSubstraitReadFileFormatV1(fileFormat: FileFormat): LocalFilesNode.ReadFileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -78,9 +77,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
getProperties))
}

val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
sparkContext.hadoopConfiguration)

override protected def doValidateInternal(): ValidationResult = {
var fields = schema.fields

Expand All @@ -100,7 +96,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
fields,
getRootFilePaths,
getProperties,
Some(serializableHadoopConf))
sparkContext.hadoopConfiguration)
if (!validationResult.ok()) {
return validationResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import com.google.common.collect.Lists
import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils
Expand Down Expand Up @@ -236,9 +235,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

val sparkConf: SparkConf = sparkContext.getConf

val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add @transient lazy could fix the issue

Copy link
Copy Markdown
Contributor Author

@zml1206 zml1206 Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If serialization is require, lazy is a solution to avoid serialization without enabling viewFS, but if serialization is not needed here, it would be simpler to use sparkContext.hadoopConfiguration directly.

sparkContext.hadoopConfiguration)

val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo

@transient
Expand Down Expand Up @@ -400,7 +396,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
splitInfo.getPaths.asScala.toSeq,
viewfsToHdfsCache,
serializableHadoopConf.value)
sparkContext.hadoopConfiguration)
splitInfo.setPaths(newPaths.asJava)
}
}
Expand Down Expand Up @@ -463,7 +459,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
splitInfo.getPaths.asScala.toSeq,
viewfsToHdfsCache,
serializableHadoopConf.value)
sparkContext.hadoopConfiguration)
splitInfo.setPaths(newPaths.asJava)
}
}
Expand Down
Loading