diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index d71d41bf2d4c..dcdb5dcc5d38 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -44,7 +44,8 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration + +import org.apache.hadoop.conf.Configuration import java.util.Locale @@ -177,7 +178,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { fields: Array[StructField], rootPaths: Seq[String], properties: Map[String, String], - serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = { + hadoopConf: Configuration): ValidationResult = { // Validate if all types are supported. def hasComplexType: Boolean = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 23fec4bbdc20..b194e2183e67 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -48,8 +48,8 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import scala.util.control.Breaks.breakable @@ -103,7 +103,7 @@ object VeloxBackendSettings extends BackendSettingsApi { fields: Array[StructField], rootPaths: Seq[String], properties: Map[String, String], - serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = { + hadoopConf: Configuration): ValidationResult = { def validateScheme(): Option[String] = { val filteredRootPaths = distinctRootPaths(rootPaths) @@ -187,11 +187,7 @@ object VeloxBackendSettings extends BackendSettingsApi { val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit val encryptionResult = - ParquetMetadataUtils.validateEncryption( - format, - rootPaths, - serializableHadoopConf, - fileLimit) + ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf, fileLimit) if (encryptionResult.ok()) { None } else { diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index 48d0629268da..eb917a4772d2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -21,8 +21,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat -import org.apache.spark.util.SerializableConfiguration - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator} @@ -38,29 +36,27 @@ object ParquetMetadataUtils { * File format, e.g., `ParquetReadFormat` * @param rootPaths * List of file paths to scan - * @param serializableHadoopConf - * Optional Hadoop configuration + * @param hadoopConf + * Hadoop configuration * @return * [[ValidationResult]] validation success or failure */ def validateEncryption( format: ReadFileFormat, rootPaths: Seq[String], - serializableHadoopConf: Option[SerializableConfiguration], + hadoopConf: Configuration, fileLimit: Int ): ValidationResult = { if (format != ParquetReadFormat || rootPaths.isEmpty) { return ValidationResult.succeeded } - val conf = serializableHadoopConf.map(_.value).getOrElse(new Configuration()) - rootPaths.foreach { rootPath => - val fs = new Path(rootPath).getFileSystem(conf) + val fs = new Path(rootPath).getFileSystem(hadoopConf) try { val encryptionDetected = - checkForEncryptionWithLimit(fs, new Path(rootPath), conf, fileLimit = fileLimit) + checkForEncryptionWithLimit(fs, new Path(rootPath), hadoopConf, fileLimit = fileLimit) if (encryptionDetected) { return ValidationResult.failed("Encrypted Parquet file detected.") } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 937d503c1fe5..c9a3fdbc6dd4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.types.StructField -import org.apache.spark.util.SerializableConfiguration + +import org.apache.hadoop.conf.Configuration trait BackendSettingsApi { @@ -41,7 +42,7 @@ trait BackendSettingsApi { fields: Array[StructField], rootPaths: Seq[String], properties: Map[String, String], - serializableHadoopConf: Option[SerializableConfiguration] = None): ValidationResult = + hadoopConf: Configuration): ValidationResult = ValidationResult.succeeded def getSubstraitReadFileFormatV1(fileFormat: FileFormat): LocalFilesNode.ReadFileFormat diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index b5d3c03fc6ef..056c35a527cb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration import com.google.protobuf.StringValue import io.substrait.proto.NamedStruct @@ -78,9 +77,6 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource getProperties)) } - val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) - override protected def doValidateInternal(): ValidationResult = { var fields = schema.fields @@ -100,7 +96,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource fields, getRootFilePaths, getProperties, - Some(serializableHadoopConf)) + sparkContext.hadoopConfiguration) if (!validationResult.ok()) { return validationResult } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index b1b0f3ddfcd0..df2911042662 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils @@ -236,9 +235,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val sparkConf: SparkConf = sparkContext.getConf - val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) - val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.get.numaBindingInfo @transient @@ -400,7 +396,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.value) + sparkContext.hadoopConfiguration) splitInfo.setPaths(newPaths.asJava) } } @@ -463,7 +459,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val newPaths = ViewFileSystemUtils.convertViewfsToHdfs( splitInfo.getPaths.asScala.toSeq, viewfsToHdfsCache, - serializableHadoopConf.value) + sparkContext.hadoopConfiguration) splitInfo.setPaths(newPaths.asJava) } }