From 10c8f8c28f75454dca681151d4801e5a0bddad6a Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 10 Jan 2025 12:01:00 +0000 Subject: [PATCH 01/10] update --- .../gluten/utils/ParquetMetadataUtils.scala | 34 +--- .../ParquetEncryptionDetectionSuite.scala | 178 ++++++++++++++++++ pom.xml | 2 +- .../apache/gluten/sql/shims/SparkShims.scala | 5 + .../sql/shims/spark32/Spark32Shims.scala | 21 ++- .../apache/gluten/utils/ExceptionUtils.scala | 45 +++++ .../sql/shims/spark33/Spark33Shims.scala | 21 +++ .../sql/shims/spark34/Spark34Shims.scala | 15 +- .../sql/shims/spark35/Spark35Shims.scala | 14 ++ 9 files changed, 300 insertions(+), 35 deletions(-) create mode 100644 backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala create mode 100644 shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index 6a96789d512e..9f43575cf90c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -17,6 +17,7 @@ package org.apache.gluten.utils import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat @@ -24,8 +25,6 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator} -import org.apache.parquet.crypto.ParquetCryptoRuntimeException -import org.apache.parquet.hadoop.ParquetFileReader object ParquetMetadataUtils { @@ -98,38 +97,9 @@ object ParquetMetadataUtils { while (filesIterator.hasNext && checkedFileCount < fileLimit) { val fileStatus = filesIterator.next() checkedFileCount += 1 - try { - ParquetFileReader.readFooter(conf, fileStatus.getPath).toString - } catch { - case e: Exception if hasCause(e, classOf[ParquetCryptoRuntimeException]) => - return true - case e: Exception => - } - } - false - } - - /** - * Utility to check the exception for the specified type. Parquet 1.12 does not provide direct - * utility to check for encryption. Newer versions provide utility to check encryption from read - * footer which can be used in the future once Spark brings it in. - * - * @param throwable - * Exception to check - * @param causeType - * Class of the cause to look for - * @tparam T - * Type of the cause - * @return - * True if the cause is found; false otherwise - */ - private def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { - var current = throwable - while (current != null) { - if (causeType.isInstance(current)) { + if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, conf)) { return true } - current = current.getCause } false } diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala new file mode 100644 index 000000000000..9e1c26010bf9 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} +import org.apache.parquet.crypto.{ColumnEncryptionProperties, FileEncryptionProperties} +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.hadoop.example.ExampleParquetWriter +import org.apache.parquet.hadoop.metadata.ColumnPath +import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types} +import org.junit.Assert._ +import org.scalatest.funsuite.AnyFunSuite + +import java.io.File +import java.nio.charset.StandardCharsets +import java.util.Base64 + +import scala.collection.JavaConverters._ +// scalastyle:off println + +class ParquetEncryptionDetectionSuite extends AnyFunSuite { + + // Encryption keys + private val masterKey = + Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)) + private val columnKey = + Base64.getEncoder.encodeToString("1234567890123456".getBytes(StandardCharsets.UTF_8)) + + // Schema definition + private val schema: MessageType = Types + .buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED).named("id")) + .addField( + Types + .primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) + .named("name")) + .named("TestSchema") + + // Helper to write a Parquet file + private def writeParquet( + path: String, + encryptionProperties: Option[FileEncryptionProperties], + data: Seq[Map[String, Any]] + ): Unit = { + val configuration = new Configuration() + val writerBuilder = ExampleParquetWriter + .builder(new Path(path)) + .withConf(configuration) + .withType(schema) + .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) + + encryptionProperties.foreach(writerBuilder.withEncryption) + + val writer = writerBuilder.build() + try { + data.foreach { + row => + val group = new SimpleGroup(schema) + row.foreach { + case (key, value) => + value match { + case i: Int => group.add(key, i) + case s: String => group.add(key, s) + } + } + writer.write(group) + } + } finally { + writer.close() + } + } + + // Helper to get LocatedFileStatus + private def getLocatedFileStatus(path: String): LocatedFileStatus = { + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.listFiles(new Path(path), false).next() + } + + // Helper to create and clean temporary directories + private def withTempDir(testCode: File => Any): Unit = { + val tempDir = File.createTempFile("test", "").getCanonicalFile + if (tempDir.exists()) { + tempDir.delete() + } + tempDir.mkdir() + try { + testCode(tempDir) + } finally { + tempDir.delete() + } + } + + // Test: Detect encrypted Parquet with encrypted footer + test("Detect encrypted Parquet with encrypted footer") { + withTempDir { + tempDir => + val filePath = s"${tempDir.getAbsolutePath}/encrypted_footer.parquet" + val encryptionProps = FileEncryptionProperties + .builder(Base64.getDecoder.decode(masterKey)) + .withEncryptedColumns( + Map( + ColumnPath.get("name") -> ColumnEncryptionProperties + .builder(ColumnPath.get("name")) + .withKey(Base64.getDecoder.decode(columnKey)) + .build()).asJava) + .build() + + writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, "name" -> "Alice"))) + val fileStatus = getLocatedFileStatus(filePath) + + assertTrue( + SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + } + } + + // Test: Detect encrypted Parquet without encrypted footer (plaintext footer) + test("Detect encrypted Parquet without encrypted footer (plaintext footer)") { + withTempDir { + tempDir => + val filePath = s"${tempDir.getAbsolutePath}/plaintext_footer.parquet" + val encryptionProps = FileEncryptionProperties + .builder(Base64.getDecoder.decode(masterKey)) + .withEncryptedColumns( + Map( + ColumnPath.get("name") -> ColumnEncryptionProperties + .builder(ColumnPath.get("name")) + .withKey(Base64.getDecoder.decode(columnKey)) + .build()).asJava) + .withPlaintextFooter() + .build() + + writeParquet( + "/home/user/plaintext_footer.parquet", + Some(encryptionProps), + Seq(Map("id" -> 1, "name" -> "Bob"))) + val fileStatus = getLocatedFileStatus("/home/user/plaintext_footer.parquet") +// ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath) + println( + "Parquet file utils, 152, ", + SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + assertTrue( + SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + } + } + + // Test: Detect plain (unencrypted) Parquet file + test("Detect plain (unencrypted) Parquet file") { + withTempDir { + tempDir => + val filePath = s"${tempDir.getAbsolutePath}/plain.parquet" + + writeParquet(filePath, None, Seq(Map("id" -> 1, "name" -> "Charlie"))) + val fileStatus = getLocatedFileStatus(filePath) + + assertFalse( + SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + } + } +} diff --git a/pom.xml b/pom.xml index 9cdc3f2c492d..536384d1c47d 100644 --- a/pom.xml +++ b/pom.xml @@ -1367,7 +1367,7 @@ ${project.basedir}/src/test/scala ${project.basedir}/scalastyle-output.xml UTF-8 - dev/scalastyle-config.xml + /home/user/gluten/dev/scalastyle-config.xml diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index fba6a4a5a48a..681e0f583d5d 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -48,7 +48,9 @@ import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.LocatedFileStatus import org.apache.parquet.schema.MessageType import java.util.{Map => JMap, Properties} @@ -285,4 +287,7 @@ trait SparkShims { /** Shim method for usages from GlutenExplainUtils.scala. */ def unsetOperatorId(plan: QueryPlan[_]): Unit + + def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf: Configuration): Boolean + } diff --git a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala index 833d2385b03a..123f74770bbd 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala @@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark32 import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.expression.{ExpressionNames, Sig} import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} +import org.apache.gluten.utils.ExceptionUtils import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext, TaskContextUtils} import org.apache.spark.scheduler.TaskInfo @@ -51,7 +52,10 @@ import org.apache.spark.sql.types.{DecimalType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.parquet.crypto.ParquetCryptoRuntimeException +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.MessageType import java.util.{HashMap => JHashMap, Map => JMap, Properties} @@ -296,4 +300,19 @@ class Spark32Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } + + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + try { + ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString + false + } catch { + case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + true + case e: Throwable => + e.printStackTrace() + false + } + } } diff --git a/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala new file mode 100644 index 000000000000..6ae8f001d5c6 --- /dev/null +++ b/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +object ExceptionUtils { + + /** + * Utility to check the exception for the specified type. Parquet 1.12 does not provide direct + * utility to check for encryption. Newer versions provide utility to check encryption from read + * footer which can be used in the future once Spark brings it in. + * + * @param throwable + * Exception to check + * @param causeType + * Class of the cause to look for + * @tparam T + * Type of the cause + * @return + * True if the cause is found; false otherwise + */ + def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { + var current = throwable + while (current != null) { + if (causeType.isInstance(current)) { + return true + } + current = current.getCause + } + false + } +} diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 2135780d05fb..75821d603e79 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -377,4 +377,25 @@ class Spark33Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } + // scalastyle:off + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + try { + // Attempt to read the footer + ParquetFileReader.readFooter(conf, fileStatus.getPath) + false // No exception means the file is not encrypted + } catch { + case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + println(s"DEBUG: Detected ParquetCryptoRuntimeException: ${e.getMessage}") + e.printStackTrace() // Print the full stack trace for debugging + true + case e: Throwable => + // Print general exceptions for debugging + println(s"DEBUG: Unexpected exception occurred: ${e.getClass.getName}") + println(s"DEBUG: Exception message: ${e.getMessage}") + e.printStackTrace() // Print the full stack trace + false + } + } } diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index bedad4c01741..e381c8090afa 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -512,4 +512,17 @@ class Spark34Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } -} + + + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + try { + ParquetFileReader.readFooter(conf, fileStatus.getPath) + false + } catch { + case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + true + case _: Throwable => false + } + }} diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 43ed51579a1b..e9bbba457a08 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -549,4 +549,18 @@ class Spark35Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { QueryPlan.localIdMap.get().remove(plan) } + + + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + try { + ParquetFileReader.readFooter(conf, fileStatus.getPath) + false + } catch { + case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + true + case _: Throwable => false + } + } } From 01ac9c558ce762cce41098411a00f020d425864a Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 10 Jan 2025 12:09:00 +0000 Subject: [PATCH 02/10] update --- .../ParquetEncryptionDetectionSuite.scala | 17 +- pom.xml | 2 +- .../apache/gluten/utils/ExceptionUtils.scala | 4 +- .../sql/shims/spark33/Spark33Shims.scala | 15 +- .../sql/shims/spark34/Spark34Shims.scala | 181 +++++++++--------- 5 files changed, 100 insertions(+), 119 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala index 9e1c26010bf9..f0851f56ae9c 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -33,17 +33,14 @@ import java.nio.charset.StandardCharsets import java.util.Base64 import scala.collection.JavaConverters._ -// scalastyle:off println class ParquetEncryptionDetectionSuite extends AnyFunSuite { - // Encryption keys private val masterKey = Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)) private val columnKey = Base64.getEncoder.encodeToString("1234567890123456".getBytes(StandardCharsets.UTF_8)) - // Schema definition private val schema: MessageType = Types .buildMessage() .addField( @@ -54,7 +51,6 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { .named("name")) .named("TestSchema") - // Helper to write a Parquet file private def writeParquet( path: String, encryptionProperties: Option[FileEncryptionProperties], @@ -88,14 +84,12 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { } } - // Helper to get LocatedFileStatus private def getLocatedFileStatus(path: String): LocatedFileStatus = { val conf = new Configuration() val fs = FileSystem.get(conf) fs.listFiles(new Path(path), false).next() } - // Helper to create and clean temporary directories private def withTempDir(testCode: File => Any): Unit = { val tempDir = File.createTempFile("test", "").getCanonicalFile if (tempDir.exists()) { @@ -109,7 +103,6 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { } } - // Test: Detect encrypted Parquet with encrypted footer test("Detect encrypted Parquet with encrypted footer") { withTempDir { tempDir => @@ -132,7 +125,6 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { } } - // Test: Detect encrypted Parquet without encrypted footer (plaintext footer) test("Detect encrypted Parquet without encrypted footer (plaintext footer)") { withTempDir { tempDir => @@ -149,20 +141,15 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { .build() writeParquet( - "/home/user/plaintext_footer.parquet", + filePath, Some(encryptionProps), Seq(Map("id" -> 1, "name" -> "Bob"))) - val fileStatus = getLocatedFileStatus("/home/user/plaintext_footer.parquet") -// ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath) - println( - "Parquet file utils, 152, ", - SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + val fileStatus = getLocatedFileStatus(filePath) assertTrue( SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) } } - // Test: Detect plain (unencrypted) Parquet file test("Detect plain (unencrypted) Parquet file") { withTempDir { tempDir => diff --git a/pom.xml b/pom.xml index 536384d1c47d..9cdc3f2c492d 100644 --- a/pom.xml +++ b/pom.xml @@ -1367,7 +1367,7 @@ ${project.basedir}/src/test/scala ${project.basedir}/scalastyle-output.xml UTF-8 - /home/user/gluten/dev/scalastyle-config.xml + dev/scalastyle-config.xml diff --git a/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala index 6ae8f001d5c6..f86071186da4 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala @@ -19,9 +19,7 @@ package org.apache.gluten.utils object ExceptionUtils { /** - * Utility to check the exception for the specified type. Parquet 1.12 does not provide direct - * utility to check for encryption. Newer versions provide utility to check encryption from read - * footer which can be used in the future once Spark brings it in. + * Utility to check the exception for the specified type. * * @param throwable * Exception to check diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 75821d603e79..f438cea035c3 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -377,25 +377,20 @@ class Spark33Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } - // scalastyle:off override def isParquetFileEncrypted( fileStatus: LocatedFileStatus, conf: Configuration): Boolean = { try { - // Attempt to read the footer - ParquetFileReader.readFooter(conf, fileStatus.getPath) - false // No exception means the file is not encrypted + ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString + false } catch { case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => - println(s"DEBUG: Detected ParquetCryptoRuntimeException: ${e.getMessage}") - e.printStackTrace() // Print the full stack trace for debugging true case e: Throwable => - // Print general exceptions for debugging - println(s"DEBUG: Unexpected exception occurred: ${e.getClass.getName}") - println(s"DEBUG: Exception message: ${e.getMessage}") - e.printStackTrace() // Print the full stack trace + e.printStackTrace() false } } + + } diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index e381c8090afa..752107c81714 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -67,8 +67,8 @@ class Spark34Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR override def getDistribution( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression]): Seq[Distribution] = { + leftKeys: Seq[Expression], + rightKeys: Seq[Expression]): Seq[Distribution] = { ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil } @@ -99,15 +99,15 @@ class Spark34Shims extends SparkShims { } override def convertPartitionTransforms( - partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { + partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { CatalogUtil.convertPartitionTransforms(partitions) } override def generateFileScanRDD( - sparkSession: SparkSession, - readFunction: PartitionedFile => Iterator[InternalRow], - filePartitions: Seq[FilePartition], - fileSourceScanExec: FileSourceScanExec): FileScanRDD = { + sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + filePartitions: Seq[FilePartition], + fileSourceScanExec: FileSourceScanExec): FileScanRDD = { new FileScanRDD( sparkSession, readFunction, @@ -120,14 +120,14 @@ class Spark34Shims extends SparkShims { } override def getTextScan( - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - dataSchema: StructType, - readDataSchema: StructType, - readPartitionSchema: StructType, - options: CaseInsensitiveStringMap, - partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): TextScan = { + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap, + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): TextScan = { new TextScan( sparkSession, fileIndex, @@ -140,7 +140,7 @@ class Spark34Shims extends SparkShims { } override def filesGroupedToBuckets( - selectedPartitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + selectedPartitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { selectedPartitions .flatMap { p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)) @@ -156,11 +156,11 @@ class Spark34Shims extends SparkShims { override def getBatchScanExecTable(batchScan: BatchScanExec): Table = batchScan.table override def generatePartitionedFile( - partitionValues: InternalRow, - filePath: String, - start: Long, - length: Long, - @transient locations: Array[String] = Array.empty): PartitionedFile = + partitionValues: InternalRow, + filePath: String, + start: Long, + length: Long, + @transient locations: Array[String] = Array.empty): PartitionedFile = PartitionedFile(partitionValues, SparkPath.fromPathString(filePath), start, length, locations) override def bloomFilterExpressionMappings(): Seq[Sig] = Seq( @@ -169,11 +169,11 @@ class Spark34Shims extends SparkShims { ) override def newBloomFilterAggregate[T]( - child: Expression, - estimatedNumItemsExpression: Expression, - numBitsExpression: Expression, - mutableAggBufferOffset: Int, - inputAggBufferOffset: Int): TypedImperativeAggregate[T] = { + child: Expression, + estimatedNumItemsExpression: Expression, + numBitsExpression: Expression, + mutableAggBufferOffset: Int, + inputAggBufferOffset: Int): TypedImperativeAggregate[T] = { BloomFilterAggregate( child, estimatedNumItemsExpression, @@ -183,25 +183,25 @@ class Spark34Shims extends SparkShims { } override def newMightContain( - bloomFilterExpression: Expression, - valueExpression: Expression): BinaryExpression = { + bloomFilterExpression: Expression, + valueExpression: Expression): BinaryExpression = { BloomFilterMightContain(bloomFilterExpression, valueExpression) } override def replaceBloomFilterAggregate[T]( - expr: Expression, - bloomFilterAggReplacer: ( - Expression, - Expression, - Expression, - Int, - Int) => TypedImperativeAggregate[T]): Expression = expr match { + expr: Expression, + bloomFilterAggReplacer: ( + Expression, + Expression, + Expression, + Int, + Int) => TypedImperativeAggregate[T]): Expression = expr match { case BloomFilterAggregate( - child, - estimatedNumItemsExpression, - numBitsExpression, - mutableAggBufferOffset, - inputAggBufferOffset) => + child, + estimatedNumItemsExpression, + numBitsExpression, + mutableAggBufferOffset, + inputAggBufferOffset) => bloomFilterAggReplacer( child, estimatedNumItemsExpression, @@ -212,21 +212,21 @@ class Spark34Shims extends SparkShims { } override def replaceMightContain[T]( - expr: Expression, - mightContainReplacer: (Expression, Expression) => BinaryExpression): Expression = expr match { + expr: Expression, + mightContainReplacer: (Expression, Expression) => BinaryExpression): Expression = expr match { case BloomFilterMightContain(bloomFilterExpression, valueExpression) => mightContainReplacer(bloomFilterExpression, valueExpression) case other => other } override def getFileSizeAndModificationTime( - file: PartitionedFile): (Option[Long], Option[Long]) = { + file: PartitionedFile): (Option[Long], Option[Long]) = { (Some(file.fileSize), Some(file.modificationTime)) } override def generateMetadataColumns( - file: PartitionedFile, - metadataColumnNames: Seq[String]): JMap[String, String] = { + file: PartitionedFile, + metadataColumnNames: Seq[String]): JMap[String, String] = { val metadataColumn = new JHashMap[String, String]() val path = new Path(file.filePath.toString) for (columnName <- metadataColumnNames) { @@ -282,13 +282,13 @@ class Spark34Shims extends SparkShims { override def getExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] = List() override def writeFilesExecuteTask( - description: WriteJobDescription, - jobTrackerID: String, - sparkStageId: Int, - sparkPartitionId: Int, - sparkAttemptNumber: Int, - committer: FileCommitProtocol, - iterator: Iterator[InternalRow]): WriteTaskResult = { + description: WriteJobDescription, + jobTrackerID: String, + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[InternalRow]): WriteTaskResult = { GlutenFileFormatWriter.writeFilesExecuteTask( description, jobTrackerID, @@ -311,8 +311,8 @@ class Spark34Shims extends SparkShims { } def setJobDescriptionOrTagForBroadcastExchange( - sc: SparkContext, - broadcastExchange: BroadcastExchangeLike): Unit = { + sc: SparkContext, + broadcastExchange: BroadcastExchangeLike): Unit = { // Setup a job group here so later it may get cancelled by groupId if necessary. sc.setJobGroup( broadcastExchange.runId.toString, @@ -321,17 +321,17 @@ class Spark34Shims extends SparkShims { } def cancelJobGroupForBroadcastExchange( - sc: SparkContext, - broadcastExchange: BroadcastExchangeLike): Unit = { + sc: SparkContext, + broadcastExchange: BroadcastExchangeLike): Unit = { sc.cancelJobGroup(broadcastExchange.runId.toString) } override def getShuffleReaderParam[K, C]( - handle: ShuffleHandle, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = { + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = { ShuffleUtils.getReaderParam(handle, startMapIndex, endMapIndex, startPartition, endPartition) } @@ -344,14 +344,14 @@ class Spark34Shims extends SparkShims { def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = partition.files def isFileSplittable( - relation: HadoopFsRelation, - filePath: Path, - sparkSchema: StructType): Boolean = { + relation: HadoopFsRelation, + filePath: Path, + sparkSchema: StructType): Boolean = { // SPARK-39634: Allow file splitting in combination with row index generation once // the fix for PARQUET-2161 is available. relation.fileFormat .isSplitable(relation.sparkSession, relation.options, filePath) && - !(RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema) >= 0) + !(RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema) >= 0) } def isRowIndexMetadataColumn(name: String): Boolean = { @@ -375,12 +375,12 @@ class Spark34Shims extends SparkShims { } def splitFiles( - sparkSession: SparkSession, - file: FileStatus, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { + sparkSession: SparkSession, + file: FileStatus, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { PartitionedFileUtil.splitFiles( sparkSession, file, @@ -409,15 +409,15 @@ class Spark34Shims extends SparkShims { } override def getCommonPartitionValues( - batchScan: BatchScanExec): Option[Seq[(InternalRow, Int)]] = { + batchScan: BatchScanExec): Option[Seq[(InternalRow, Int)]] = { batchScan.commonPartitionValues } override def orderPartitions( - scan: Scan, - keyGroupedPartitioning: Option[Seq[Expression]], - filteredPartitions: Seq[Seq[InputPartition]], - outputPartitioning: Partitioning): Seq[InputPartition] = { + scan: Scan, + keyGroupedPartitioning: Option[Seq[Expression]], + filteredPartitions: Seq[Seq[InputPartition]], + outputPartitioning: Partitioning): Seq[InputPartition] = { scan match { case _ if keyGroupedPartitioning.isDefined => var newPartitions = filteredPartitions @@ -468,12 +468,12 @@ class Spark34Shims extends SparkShims { } override def dateTimestampFormatInReadIsDefaultValue( - csvOptions: CSVOptions, - timeZone: String): Boolean = { + csvOptions: CSVOptions, + timeZone: String): Boolean = { val default = new CSVOptions(CaseInsensitiveMap(Map()), csvOptions.columnPruning, timeZone) csvOptions.dateFormatInRead == default.dateFormatInRead && - csvOptions.timestampFormatInRead == default.timestampFormatInRead && - csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead + csvOptions.timestampFormatInRead == default.timestampFormatInRead && + csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead } override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = { @@ -481,9 +481,9 @@ class Spark34Shims extends SparkShims { } override def createParquetFilters( - conf: SQLConf, - schema: MessageType, - caseSensitive: Option[Boolean] = None): ParquetFilters = { + conf: SQLConf, + schema: MessageType, + caseSensitive: Option[Boolean] = None): ParquetFilters = { new ParquetFilters( schema, conf.parquetFilterPushDownDate, @@ -512,17 +512,18 @@ class Spark34Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } - - override def isParquetFileEncrypted( - fileStatus: LocatedFileStatus, - conf: Configuration): Boolean = { + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { try { - ParquetFileReader.readFooter(conf, fileStatus.getPath) + ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString false } catch { case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => true - case _: Throwable => false + case e: Throwable => + e.printStackTrace() + false } - }} + } +} \ No newline at end of file From 37ac738a61b771828ef934936a148ce5bbbc1432 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 10 Jan 2025 12:48:49 +0000 Subject: [PATCH 03/10] format --- .../ParquetEncryptionDetectionSuite.scala | 21 ++- .../sql/shims/spark33/Spark33Shims.scala | 5 +- .../sql/shims/spark34/Spark34Shims.scala | 168 +++++++++--------- .../sql/shims/spark35/Spark35Shims.scala | 5 +- 4 files changed, 105 insertions(+), 94 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala index f0851f56ae9c..f0338b740aea 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -34,6 +34,22 @@ import java.util.Base64 import scala.collection.JavaConverters._ +/** + * This suite attempt to test parquet encryption for fallback of scan operator. Will check the + * following: + * 1. Plain Parquet File: + * - Writes a Parquet file with no encryption. + * - Asserts that parquet is not encrypted + * + * 2. Encrypted Parquet File (with encrypted footer): + * - Writes a Parquet file with column-level encryption and an encrypted footer. + * - Asserts that the file is encrypted. + * + * 3. Encrypted Parquet File (with plaintext footer): + * - Writes a Parquet file with column-level encryption but a plaintext (unencrypted) footer. + * - Ensures the file is still detected as encrypted despite the plaintext footer. + */ + class ParquetEncryptionDetectionSuite extends AnyFunSuite { private val masterKey = @@ -140,10 +156,7 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { .withPlaintextFooter() .build() - writeParquet( - filePath, - Some(encryptionProps), - Seq(Map("id" -> 1, "name" -> "Bob"))) + writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, "name" -> "Bob"))) val fileStatus = getLocatedFileStatus(filePath) assertTrue( SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index f438cea035c3..4f052cc2c19e 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -378,8 +378,8 @@ class Spark33Shims extends SparkShims { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } override def isParquetFileEncrypted( - fileStatus: LocatedFileStatus, - conf: Configuration): Boolean = { + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { try { ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString false @@ -392,5 +392,4 @@ class Spark33Shims extends SparkShims { } } - } diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 752107c81714..7f8138b353b0 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -67,8 +67,8 @@ class Spark34Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR override def getDistribution( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression]): Seq[Distribution] = { + leftKeys: Seq[Expression], + rightKeys: Seq[Expression]): Seq[Distribution] = { ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil } @@ -99,15 +99,15 @@ class Spark34Shims extends SparkShims { } override def convertPartitionTransforms( - partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { + partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { CatalogUtil.convertPartitionTransforms(partitions) } override def generateFileScanRDD( - sparkSession: SparkSession, - readFunction: PartitionedFile => Iterator[InternalRow], - filePartitions: Seq[FilePartition], - fileSourceScanExec: FileSourceScanExec): FileScanRDD = { + sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + filePartitions: Seq[FilePartition], + fileSourceScanExec: FileSourceScanExec): FileScanRDD = { new FileScanRDD( sparkSession, readFunction, @@ -120,14 +120,14 @@ class Spark34Shims extends SparkShims { } override def getTextScan( - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - dataSchema: StructType, - readDataSchema: StructType, - readPartitionSchema: StructType, - options: CaseInsensitiveStringMap, - partitionFilters: Seq[Expression], - dataFilters: Seq[Expression]): TextScan = { + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap, + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): TextScan = { new TextScan( sparkSession, fileIndex, @@ -140,7 +140,7 @@ class Spark34Shims extends SparkShims { } override def filesGroupedToBuckets( - selectedPartitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { + selectedPartitions: Array[PartitionDirectory]): Map[Int, Array[PartitionedFile]] = { selectedPartitions .flatMap { p => p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)) @@ -156,11 +156,11 @@ class Spark34Shims extends SparkShims { override def getBatchScanExecTable(batchScan: BatchScanExec): Table = batchScan.table override def generatePartitionedFile( - partitionValues: InternalRow, - filePath: String, - start: Long, - length: Long, - @transient locations: Array[String] = Array.empty): PartitionedFile = + partitionValues: InternalRow, + filePath: String, + start: Long, + length: Long, + @transient locations: Array[String] = Array.empty): PartitionedFile = PartitionedFile(partitionValues, SparkPath.fromPathString(filePath), start, length, locations) override def bloomFilterExpressionMappings(): Seq[Sig] = Seq( @@ -169,11 +169,11 @@ class Spark34Shims extends SparkShims { ) override def newBloomFilterAggregate[T]( - child: Expression, - estimatedNumItemsExpression: Expression, - numBitsExpression: Expression, - mutableAggBufferOffset: Int, - inputAggBufferOffset: Int): TypedImperativeAggregate[T] = { + child: Expression, + estimatedNumItemsExpression: Expression, + numBitsExpression: Expression, + mutableAggBufferOffset: Int, + inputAggBufferOffset: Int): TypedImperativeAggregate[T] = { BloomFilterAggregate( child, estimatedNumItemsExpression, @@ -183,25 +183,25 @@ class Spark34Shims extends SparkShims { } override def newMightContain( - bloomFilterExpression: Expression, - valueExpression: Expression): BinaryExpression = { + bloomFilterExpression: Expression, + valueExpression: Expression): BinaryExpression = { BloomFilterMightContain(bloomFilterExpression, valueExpression) } override def replaceBloomFilterAggregate[T]( - expr: Expression, - bloomFilterAggReplacer: ( - Expression, - Expression, - Expression, - Int, - Int) => TypedImperativeAggregate[T]): Expression = expr match { + expr: Expression, + bloomFilterAggReplacer: ( + Expression, + Expression, + Expression, + Int, + Int) => TypedImperativeAggregate[T]): Expression = expr match { case BloomFilterAggregate( - child, - estimatedNumItemsExpression, - numBitsExpression, - mutableAggBufferOffset, - inputAggBufferOffset) => + child, + estimatedNumItemsExpression, + numBitsExpression, + mutableAggBufferOffset, + inputAggBufferOffset) => bloomFilterAggReplacer( child, estimatedNumItemsExpression, @@ -212,21 +212,21 @@ class Spark34Shims extends SparkShims { } override def replaceMightContain[T]( - expr: Expression, - mightContainReplacer: (Expression, Expression) => BinaryExpression): Expression = expr match { + expr: Expression, + mightContainReplacer: (Expression, Expression) => BinaryExpression): Expression = expr match { case BloomFilterMightContain(bloomFilterExpression, valueExpression) => mightContainReplacer(bloomFilterExpression, valueExpression) case other => other } override def getFileSizeAndModificationTime( - file: PartitionedFile): (Option[Long], Option[Long]) = { + file: PartitionedFile): (Option[Long], Option[Long]) = { (Some(file.fileSize), Some(file.modificationTime)) } override def generateMetadataColumns( - file: PartitionedFile, - metadataColumnNames: Seq[String]): JMap[String, String] = { + file: PartitionedFile, + metadataColumnNames: Seq[String]): JMap[String, String] = { val metadataColumn = new JHashMap[String, String]() val path = new Path(file.filePath.toString) for (columnName <- metadataColumnNames) { @@ -282,13 +282,13 @@ class Spark34Shims extends SparkShims { override def getExtendedColumnarPostRules(): List[SparkSession => Rule[SparkPlan]] = List() override def writeFilesExecuteTask( - description: WriteJobDescription, - jobTrackerID: String, - sparkStageId: Int, - sparkPartitionId: Int, - sparkAttemptNumber: Int, - committer: FileCommitProtocol, - iterator: Iterator[InternalRow]): WriteTaskResult = { + description: WriteJobDescription, + jobTrackerID: String, + sparkStageId: Int, + sparkPartitionId: Int, + sparkAttemptNumber: Int, + committer: FileCommitProtocol, + iterator: Iterator[InternalRow]): WriteTaskResult = { GlutenFileFormatWriter.writeFilesExecuteTask( description, jobTrackerID, @@ -311,8 +311,8 @@ class Spark34Shims extends SparkShims { } def setJobDescriptionOrTagForBroadcastExchange( - sc: SparkContext, - broadcastExchange: BroadcastExchangeLike): Unit = { + sc: SparkContext, + broadcastExchange: BroadcastExchangeLike): Unit = { // Setup a job group here so later it may get cancelled by groupId if necessary. sc.setJobGroup( broadcastExchange.runId.toString, @@ -321,17 +321,17 @@ class Spark34Shims extends SparkShims { } def cancelJobGroupForBroadcastExchange( - sc: SparkContext, - broadcastExchange: BroadcastExchangeLike): Unit = { + sc: SparkContext, + broadcastExchange: BroadcastExchangeLike): Unit = { sc.cancelJobGroup(broadcastExchange.runId.toString) } override def getShuffleReaderParam[K, C]( - handle: ShuffleHandle, - startMapIndex: Int, - endMapIndex: Int, - startPartition: Int, - endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = { + handle: ShuffleHandle, + startMapIndex: Int, + endMapIndex: Int, + startPartition: Int, + endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = { ShuffleUtils.getReaderParam(handle, startMapIndex, endMapIndex, startPartition, endPartition) } @@ -344,14 +344,14 @@ class Spark34Shims extends SparkShims { def getFileStatus(partition: PartitionDirectory): Seq[FileStatus] = partition.files def isFileSplittable( - relation: HadoopFsRelation, - filePath: Path, - sparkSchema: StructType): Boolean = { + relation: HadoopFsRelation, + filePath: Path, + sparkSchema: StructType): Boolean = { // SPARK-39634: Allow file splitting in combination with row index generation once // the fix for PARQUET-2161 is available. relation.fileFormat .isSplitable(relation.sparkSession, relation.options, filePath) && - !(RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema) >= 0) + !(RowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema) >= 0) } def isRowIndexMetadataColumn(name: String): Boolean = { @@ -375,12 +375,12 @@ class Spark34Shims extends SparkShims { } def splitFiles( - sparkSession: SparkSession, - file: FileStatus, - filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { + sparkSession: SparkSession, + file: FileStatus, + filePath: Path, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { PartitionedFileUtil.splitFiles( sparkSession, file, @@ -409,15 +409,15 @@ class Spark34Shims extends SparkShims { } override def getCommonPartitionValues( - batchScan: BatchScanExec): Option[Seq[(InternalRow, Int)]] = { + batchScan: BatchScanExec): Option[Seq[(InternalRow, Int)]] = { batchScan.commonPartitionValues } override def orderPartitions( - scan: Scan, - keyGroupedPartitioning: Option[Seq[Expression]], - filteredPartitions: Seq[Seq[InputPartition]], - outputPartitioning: Partitioning): Seq[InputPartition] = { + scan: Scan, + keyGroupedPartitioning: Option[Seq[Expression]], + filteredPartitions: Seq[Seq[InputPartition]], + outputPartitioning: Partitioning): Seq[InputPartition] = { scan match { case _ if keyGroupedPartitioning.isDefined => var newPartitions = filteredPartitions @@ -468,12 +468,12 @@ class Spark34Shims extends SparkShims { } override def dateTimestampFormatInReadIsDefaultValue( - csvOptions: CSVOptions, - timeZone: String): Boolean = { + csvOptions: CSVOptions, + timeZone: String): Boolean = { val default = new CSVOptions(CaseInsensitiveMap(Map()), csvOptions.columnPruning, timeZone) csvOptions.dateFormatInRead == default.dateFormatInRead && - csvOptions.timestampFormatInRead == default.timestampFormatInRead && - csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead + csvOptions.timestampFormatInRead == default.timestampFormatInRead && + csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead } override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = { @@ -481,9 +481,9 @@ class Spark34Shims extends SparkShims { } override def createParquetFilters( - conf: SQLConf, - schema: MessageType, - caseSensitive: Option[Boolean] = None): ParquetFilters = { + conf: SQLConf, + schema: MessageType, + caseSensitive: Option[Boolean] = None): ParquetFilters = { new ParquetFilters( schema, conf.parquetFilterPushDownDate, @@ -526,4 +526,4 @@ class Spark34Shims extends SparkShims { false } } -} \ No newline at end of file +} diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index e9bbba457a08..e79d15a1a0b7 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -550,10 +550,9 @@ class Spark35Shims extends SparkShims { QueryPlan.localIdMap.get().remove(plan) } - override def isParquetFileEncrypted( - fileStatus: LocatedFileStatus, - conf: Configuration): Boolean = { + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { try { ParquetFileReader.readFooter(conf, fileStatus.getPath) false From 3546f10305dfa29fb50ac129747ee3a3119b4769 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 10 Jan 2025 13:40:00 +0000 Subject: [PATCH 04/10] update --- .../sql/shims/spark33/Spark33Shims.scala | 6 ++- .../apache/gluten/utils/ExceptionUtils.scala | 43 +++++++++++++++++++ .../sql/shims/spark34/Spark34Shims.scala | 5 ++- .../apache/gluten/utils/ExceptionUtils.scala | 43 +++++++++++++++++++ .../sql/shims/spark35/Spark35Shims.scala | 9 +--- 5 files changed, 96 insertions(+), 10 deletions(-) create mode 100644 shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala create mode 100644 shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 4f052cc2c19e..aba794a161c9 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -20,6 +20,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.expression.{ExpressionNames, Sig} import org.apache.gluten.expression.ExpressionNames.{CEIL, FLOOR, KNOWN_NULLABLE, TIMESTAMP_ADD} import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} +import org.apache.gluten.utils.ExceptionUtils import org.apache.spark._ import org.apache.spark.scheduler.TaskInfo @@ -53,7 +54,10 @@ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.parquet.crypto.ParquetCryptoRuntimeException +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.MessageType import java.time.ZoneOffset diff --git a/shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala new file mode 100644 index 000000000000..f86071186da4 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +object ExceptionUtils { + + /** + * Utility to check the exception for the specified type. + * + * @param throwable + * Exception to check + * @param causeType + * Class of the cause to look for + * @tparam T + * Type of the cause + * @return + * True if the cause is found; false otherwise + */ + def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { + var current = throwable + while (current != null) { + if (causeType.isInstance(current)) { + return true + } + current = current.getCause + } + false + } +} diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 7f8138b353b0..3d69296f7305 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -55,7 +55,10 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.parquet.crypto.ParquetCryptoRuntimeException +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.MessageType import java.time.ZoneOffset diff --git a/shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala new file mode 100644 index 000000000000..f86071186da4 --- /dev/null +++ b/shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +object ExceptionUtils { + + /** + * Utility to check the exception for the specified type. + * + * @param throwable + * Exception to check + * @param causeType + * Class of the cause to look for + * @tparam T + * Type of the cause + * @return + * True if the cause is found; false otherwise + */ + def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { + var current = throwable + while (current != null) { + if (causeType.isInstance(current)) { + return true + } + current = current.getCause + } + false + } +} diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index e79d15a1a0b7..13e230cb245a 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -553,13 +553,6 @@ class Spark35Shims extends SparkShims { override def isParquetFileEncrypted( fileStatus: LocatedFileStatus, conf: Configuration): Boolean = { - try { - ParquetFileReader.readFooter(conf, fileStatus.getPath) - false - } catch { - case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => - true - case _: Throwable => false - } + return false } } From ef430a67b933cd6855b49ed4baf248af4a393afb Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 10 Jan 2025 14:11:40 +0000 Subject: [PATCH 05/10] update --- .../org/apache/gluten/sql/shims/spark34/Spark34Shims.scala | 1 + .../org/apache/gluten/sql/shims/spark35/Spark35Shims.scala | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index 3d69296f7305..62a4afe1060d 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark34 import org.apache.gluten.expression.{ExpressionNames, Sig} import org.apache.gluten.expression.ExpressionNames.KNOWN_NULLABLE import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} +import org.apache.gluten.utils.ExceptionUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 13e230cb245a..d6abe8580ba7 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -55,7 +55,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} import org.apache.parquet.schema.MessageType import java.time.ZoneOffset From faa6b95e435c5843f5e0b1819942356973d3fd99 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 10 Jan 2025 15:19:00 +0000 Subject: [PATCH 06/10] mend update --- .../org/apache/gluten/utils/velox/VeloxTestSettings.scala | 3 ++- .../spark/sql/gluten}/ParquetEncryptionDetectionSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) rename {backends-velox/src/test/scala/org/apache/gluten/utils => gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten}/ParquetEncryptionDetectionSuite.scala (99%) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 72b77ae1f95b..225bf97ebf27 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2Strategy import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} -import org.apache.spark.sql.gluten.GlutenFallbackSuite +import org.apache.spark.sql.gluten.{GlutenFallbackSuite, ParquetEncryptionDetectionSuite} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ @@ -1177,6 +1177,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenImplicitsTest] enableSuite[GlutenCollapseProjectExecTransformerSuite] enableSuite[GlutenSparkSessionExtensionSuite] + enableSuite[ParquetEncryptionDetectionSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/ParquetEncryptionDetectionSuite.scala similarity index 99% rename from backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala rename to gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/ParquetEncryptionDetectionSuite.scala index f0338b740aea..f709f218b980 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/ParquetEncryptionDetectionSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gluten.utils +package org.apache.spark.sql.gluten import org.apache.gluten.sql.shims.SparkShimLoader From 44befbdd6b6093f4cb2f23492e380d23b360819e Mon Sep 17 00:00:00 2001 From: arnavb Date: Mon, 13 Jan 2025 22:29:36 +0000 Subject: [PATCH 07/10] update --- .../ParquetEncryptionDetectionSuite.scala | 48 +++++++++++-------- .../sql/shims/spark35/Spark35Shims.scala | 1 + 2 files changed, 29 insertions(+), 20 deletions(-) rename {gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten => backends-velox/src/test/scala/org/apache/gluten/utils}/ParquetEncryptionDetectionSuite.scala (86%) diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala similarity index 86% rename from gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/ParquetEncryptionDetectionSuite.scala rename to backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala index f709f218b980..b7ceeb7680bf 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/ParquetEncryptionDetectionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -14,10 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.gluten +package org.apache.gluten.utils import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.spark.sql.{GlutenQueryTest, SparkSession} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} import org.apache.parquet.crypto.{ColumnEncryptionProperties, FileEncryptionProperties} @@ -26,9 +28,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.hadoop.metadata.ColumnPath import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types} import org.junit.Assert._ -import org.scalatest.funsuite.AnyFunSuite -import java.io.File import java.nio.charset.StandardCharsets import java.util.Base64 @@ -50,7 +50,7 @@ import scala.collection.JavaConverters._ * - Ensures the file is still detected as encrypted despite the plaintext footer. */ -class ParquetEncryptionDetectionSuite extends AnyFunSuite { +class ParquetEncryptionDetectionSuite extends GlutenQueryTest { private val masterKey = Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)) @@ -67,6 +67,10 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { .named("name")) .named("TestSchema") + private var _spark: SparkSession = _ + + override protected def spark: SparkSession = _spark + private def writeParquet( path: String, encryptionProperties: Option[FileEncryptionProperties], @@ -106,20 +110,22 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { fs.listFiles(new Path(path), false).next() } - private def withTempDir(testCode: File => Any): Unit = { - val tempDir = File.createTempFile("test", "").getCanonicalFile - if (tempDir.exists()) { - tempDir.delete() - } - tempDir.mkdir() - try { - testCode(tempDir) - } finally { - tempDir.delete() - } - } - - test("Detect encrypted Parquet with encrypted footer") { +// private def withTempDir(testCode: File => Any): Unit = { +// val tempDir = File.createTempFile("test", "").getCanonicalFile +// if (tempDir.exists()) { +// tempDir.delete() +// } +// tempDir.mkdir() +// try { +// testCode(tempDir) +// } finally { +// tempDir.delete() +// } +// } + + testWithSpecifiedSparkVersion( + "Detect encrypted Parquet with encrypted footer", + Array("3.2", "3.3", "3.4")) { withTempDir { tempDir => val filePath = s"${tempDir.getAbsolutePath}/encrypted_footer.parquet" @@ -141,7 +147,9 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { } } - test("Detect encrypted Parquet without encrypted footer (plaintext footer)") { + testWithSpecifiedSparkVersion( + "Detect encrypted Parquet without encrypted footer (plaintext footer)", + Array("3.2", "3.3")) { withTempDir { tempDir => val filePath = s"${tempDir.getAbsolutePath}/plaintext_footer.parquet" @@ -163,7 +171,7 @@ class ParquetEncryptionDetectionSuite extends AnyFunSuite { } } - test("Detect plain (unencrypted) Parquet file") { + testWithSpecifiedSparkVersion("Detect plain (unencrypted) Parquet file", Array("3.3", "3.4")) { withTempDir { tempDir => val filePath = s"${tempDir.getAbsolutePath}/plain.parquet" diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index d6abe8580ba7..7b272ce9939e 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -554,6 +554,7 @@ class Spark35Shims extends SparkShims { override def isParquetFileEncrypted( fileStatus: LocatedFileStatus, conf: Configuration): Boolean = { + // TODO: Support will be added (https://github.com/apache/incubator-gluten/pull/8501) return false } } From 54a212eb04bb63913c0854f1af91f4e28a96dacc Mon Sep 17 00:00:00 2001 From: arnavb Date: Tue, 14 Jan 2025 05:44:06 +0000 Subject: [PATCH 08/10] update --- .../utils/velox/VeloxTestSettings.scala | 3 +- .../apache/gluten/utils/ExceptionUtils.scala | 0 .../apache/gluten/utils/ExceptionUtils.scala | 43 ------------------- .../apache/gluten/utils/ExceptionUtils.scala | 43 ------------------- 4 files changed, 1 insertion(+), 88 deletions(-) rename shims/{spark32 => common}/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala (100%) delete mode 100644 shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala delete mode 100644 shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 225bf97ebf27..72b77ae1f95b 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2Strategy import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} -import org.apache.spark.sql.gluten.{GlutenFallbackSuite, ParquetEncryptionDetectionSuite} +import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ @@ -1177,7 +1177,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenImplicitsTest] enableSuite[GlutenCollapseProjectExecTransformerSuite] enableSuite[GlutenSparkSessionExtensionSuite] - enableSuite[ParquetEncryptionDetectionSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala similarity index 100% rename from shims/spark32/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala rename to shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala diff --git a/shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala deleted file mode 100644 index f86071186da4..000000000000 --- a/shims/spark33/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.utils - -object ExceptionUtils { - - /** - * Utility to check the exception for the specified type. - * - * @param throwable - * Exception to check - * @param causeType - * Class of the cause to look for - * @tparam T - * Type of the cause - * @return - * True if the cause is found; false otherwise - */ - def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { - var current = throwable - while (current != null) { - if (causeType.isInstance(current)) { - return true - } - current = current.getCause - } - false - } -} diff --git a/shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala deleted file mode 100644 index f86071186da4..000000000000 --- a/shims/spark34/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.utils - -object ExceptionUtils { - - /** - * Utility to check the exception for the specified type. - * - * @param throwable - * Exception to check - * @param causeType - * Class of the cause to look for - * @tparam T - * Type of the cause - * @return - * True if the cause is found; false otherwise - */ - def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { - var current = throwable - while (current != null) { - if (causeType.isInstance(current)) { - return true - } - current = current.getCause - } - false - } -} From c78605c22687a90cc3e085f2d90a97c611ef6a83 Mon Sep 17 00:00:00 2001 From: arnavb Date: Wed, 15 Jan 2025 06:34:17 +0000 Subject: [PATCH 09/10] update --- .../utils/ParquetEncryptionDetectionSuite.scala | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala index b7ceeb7680bf..b7a134260059 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -110,19 +110,6 @@ class ParquetEncryptionDetectionSuite extends GlutenQueryTest { fs.listFiles(new Path(path), false).next() } -// private def withTempDir(testCode: File => Any): Unit = { -// val tempDir = File.createTempFile("test", "").getCanonicalFile -// if (tempDir.exists()) { -// tempDir.delete() -// } -// tempDir.mkdir() -// try { -// testCode(tempDir) -// } finally { -// tempDir.delete() -// } -// } - testWithSpecifiedSparkVersion( "Detect encrypted Parquet with encrypted footer", Array("3.2", "3.3", "3.4")) { @@ -149,7 +136,7 @@ class ParquetEncryptionDetectionSuite extends GlutenQueryTest { testWithSpecifiedSparkVersion( "Detect encrypted Parquet without encrypted footer (plaintext footer)", - Array("3.2", "3.3")) { + Array("3.2", "3.3", "3.4")) { withTempDir { tempDir => val filePath = s"${tempDir.getAbsolutePath}/plaintext_footer.parquet" @@ -171,7 +158,7 @@ class ParquetEncryptionDetectionSuite extends GlutenQueryTest { } } - testWithSpecifiedSparkVersion("Detect plain (unencrypted) Parquet file", Array("3.3", "3.4")) { + testWithSpecifiedSparkVersion("Detect plain (unencrypted) Parquet file", Array("3.2", "3.3", "3.4")) { withTempDir { tempDir => val filePath = s"${tempDir.getAbsolutePath}/plain.parquet" From 415c09e848e3ae1d10592704888b66737ea74938 Mon Sep 17 00:00:00 2001 From: arnavb Date: Wed, 15 Jan 2025 09:22:11 +0000 Subject: [PATCH 10/10] update --- .../apache/gluten/utils/ParquetEncryptionDetectionSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala index b7a134260059..db53c329f6f5 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -158,7 +158,9 @@ class ParquetEncryptionDetectionSuite extends GlutenQueryTest { } } - testWithSpecifiedSparkVersion("Detect plain (unencrypted) Parquet file", Array("3.2", "3.3", "3.4")) { + testWithSpecifiedSparkVersion( + "Detect plain (unencrypted) Parquet file", + Array("3.2", "3.3", "3.4")) { withTempDir { tempDir => val filePath = s"${tempDir.getAbsolutePath}/plain.parquet"