diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index 6a96789d512e..9f43575cf90c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -17,6 +17,7 @@ package org.apache.gluten.utils import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat @@ -24,8 +25,6 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator} -import org.apache.parquet.crypto.ParquetCryptoRuntimeException -import org.apache.parquet.hadoop.ParquetFileReader object ParquetMetadataUtils { @@ -98,38 +97,9 @@ object ParquetMetadataUtils { while (filesIterator.hasNext && checkedFileCount < fileLimit) { val fileStatus = filesIterator.next() checkedFileCount += 1 - try { - ParquetFileReader.readFooter(conf, fileStatus.getPath).toString - } catch { - case e: Exception if hasCause(e, classOf[ParquetCryptoRuntimeException]) => - return true - case e: Exception => - } - } - false - } - - /** - * Utility to check the exception for the specified type. Parquet 1.12 does not provide direct - * utility to check for encryption. Newer versions provide utility to check encryption from read - * footer which can be used in the future once Spark brings it in. - * - * @param throwable - * Exception to check - * @param causeType - * Class of the cause to look for - * @tparam T - * Type of the cause - * @return - * True if the cause is found; false otherwise - */ - private def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { - var current = throwable - while (current != null) { - if (causeType.isInstance(current)) { + if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, conf)) { return true } - current = current.getCause } false } diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala new file mode 100644 index 000000000000..db53c329f6f5 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.spark.sql.{GlutenQueryTest, SparkSession} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} +import org.apache.parquet.crypto.{ColumnEncryptionProperties, FileEncryptionProperties} +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.hadoop.example.ExampleParquetWriter +import org.apache.parquet.hadoop.metadata.ColumnPath +import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types} +import org.junit.Assert._ + +import java.nio.charset.StandardCharsets +import java.util.Base64 + +import scala.collection.JavaConverters._ + +/** + * This suite attempt to test parquet encryption for fallback of scan operator. Will check the + * following: + * 1. Plain Parquet File: + * - Writes a Parquet file with no encryption. + * - Asserts that parquet is not encrypted + * + * 2. Encrypted Parquet File (with encrypted footer): + * - Writes a Parquet file with column-level encryption and an encrypted footer. + * - Asserts that the file is encrypted. + * + * 3. Encrypted Parquet File (with plaintext footer): + * - Writes a Parquet file with column-level encryption but a plaintext (unencrypted) footer. + * - Ensures the file is still detected as encrypted despite the plaintext footer. + */ + +class ParquetEncryptionDetectionSuite extends GlutenQueryTest { + + private val masterKey = + Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8)) + private val columnKey = + Base64.getEncoder.encodeToString("1234567890123456".getBytes(StandardCharsets.UTF_8)) + + private val schema: MessageType = Types + .buildMessage() + .addField( + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED).named("id")) + .addField( + Types + .primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) + .named("name")) + .named("TestSchema") + + private var _spark: SparkSession = _ + + override protected def spark: SparkSession = _spark + + private def writeParquet( + path: String, + encryptionProperties: Option[FileEncryptionProperties], + data: Seq[Map[String, Any]] + ): Unit = { + val configuration = new Configuration() + val writerBuilder = ExampleParquetWriter + .builder(new Path(path)) + .withConf(configuration) + .withType(schema) + .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) + + encryptionProperties.foreach(writerBuilder.withEncryption) + + val writer = writerBuilder.build() + try { + data.foreach { + row => + val group = new SimpleGroup(schema) + row.foreach { + case (key, value) => + value match { + case i: Int => group.add(key, i) + case s: String => group.add(key, s) + } + } + writer.write(group) + } + } finally { + writer.close() + } + } + + private def getLocatedFileStatus(path: String): LocatedFileStatus = { + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.listFiles(new Path(path), false).next() + } + + testWithSpecifiedSparkVersion( + "Detect encrypted Parquet with encrypted footer", + Array("3.2", "3.3", "3.4")) { + withTempDir { + tempDir => + val filePath = s"${tempDir.getAbsolutePath}/encrypted_footer.parquet" + val encryptionProps = FileEncryptionProperties + .builder(Base64.getDecoder.decode(masterKey)) + .withEncryptedColumns( + Map( + ColumnPath.get("name") -> ColumnEncryptionProperties + .builder(ColumnPath.get("name")) + .withKey(Base64.getDecoder.decode(columnKey)) + .build()).asJava) + .build() + + writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, "name" -> "Alice"))) + val fileStatus = getLocatedFileStatus(filePath) + + assertTrue( + SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + } + } + + testWithSpecifiedSparkVersion( + "Detect encrypted Parquet without encrypted footer (plaintext footer)", + Array("3.2", "3.3", "3.4")) { + withTempDir { + tempDir => + val filePath = s"${tempDir.getAbsolutePath}/plaintext_footer.parquet" + val encryptionProps = FileEncryptionProperties + .builder(Base64.getDecoder.decode(masterKey)) + .withEncryptedColumns( + Map( + ColumnPath.get("name") -> ColumnEncryptionProperties + .builder(ColumnPath.get("name")) + .withKey(Base64.getDecoder.decode(columnKey)) + .build()).asJava) + .withPlaintextFooter() + .build() + + writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, "name" -> "Bob"))) + val fileStatus = getLocatedFileStatus(filePath) + assertTrue( + SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + } + } + + testWithSpecifiedSparkVersion( + "Detect plain (unencrypted) Parquet file", + Array("3.2", "3.3", "3.4")) { + withTempDir { + tempDir => + val filePath = s"${tempDir.getAbsolutePath}/plain.parquet" + + writeParquet(filePath, None, Seq(Map("id" -> 1, "name" -> "Charlie"))) + val fileStatus = getLocatedFileStatus(filePath) + + assertFalse( + SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration())) + } + } +} diff --git a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala index fba6a4a5a48a..681e0f583d5d 100644 --- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala @@ -48,7 +48,9 @@ import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.LocatedFileStatus import org.apache.parquet.schema.MessageType import java.util.{Map => JMap, Properties} @@ -285,4 +287,7 @@ trait SparkShims { /** Shim method for usages from GlutenExplainUtils.scala. */ def unsetOperatorId(plan: QueryPlan[_]): Unit + + def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf: Configuration): Boolean + } diff --git a/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala b/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala new file mode 100644 index 000000000000..f86071186da4 --- /dev/null +++ b/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +object ExceptionUtils { + + /** + * Utility to check the exception for the specified type. + * + * @param throwable + * Exception to check + * @param causeType + * Class of the cause to look for + * @tparam T + * Type of the cause + * @return + * True if the cause is found; false otherwise + */ + def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = { + var current = throwable + while (current != null) { + if (causeType.isInstance(current)) { + return true + } + current = current.getCause + } + false + } +} diff --git a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala index 833d2385b03a..123f74770bbd 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala @@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark32 import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.expression.{ExpressionNames, Sig} import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} +import org.apache.gluten.utils.ExceptionUtils import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext, TaskContextUtils} import org.apache.spark.scheduler.TaskInfo @@ -51,7 +52,10 @@ import org.apache.spark.sql.types.{DecimalType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.parquet.crypto.ParquetCryptoRuntimeException +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.MessageType import java.util.{HashMap => JHashMap, Map => JMap, Properties} @@ -296,4 +300,19 @@ class Spark32Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } + + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + try { + ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString + false + } catch { + case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + true + case e: Throwable => + e.printStackTrace() + false + } + } } diff --git a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala index 2135780d05fb..aba794a161c9 100644 --- a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala +++ b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala @@ -20,6 +20,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.expression.{ExpressionNames, Sig} import org.apache.gluten.expression.ExpressionNames.{CEIL, FLOOR, KNOWN_NULLABLE, TIMESTAMP_ADD} import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} +import org.apache.gluten.utils.ExceptionUtils import org.apache.spark._ import org.apache.spark.scheduler.TaskInfo @@ -53,7 +54,10 @@ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.parquet.crypto.ParquetCryptoRuntimeException +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.MessageType import java.time.ZoneOffset @@ -377,4 +381,19 @@ class Spark33Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + try { + ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString + false + } catch { + case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + true + case e: Throwable => + e.printStackTrace() + false + } + } + } diff --git a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala index bedad4c01741..62a4afe1060d 100644 --- a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala @@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark34 import org.apache.gluten.expression.{ExpressionNames, Sig} import org.apache.gluten.expression.ExpressionNames.KNOWN_NULLABLE import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} +import org.apache.gluten.utils.ExceptionUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast @@ -55,7 +56,10 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.parquet.crypto.ParquetCryptoRuntimeException +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.MessageType import java.time.ZoneOffset @@ -512,4 +516,18 @@ class Spark34Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { plan.unsetTagValue(QueryPlan.OP_ID_TAG) } + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + try { + ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString + false + } catch { + case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) => + true + case e: Throwable => + e.printStackTrace() + false + } + } } diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 43ed51579a1b..7b272ce9939e 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -55,7 +55,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.{BlockId, BlockManagerId} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} import org.apache.parquet.schema.MessageType import java.time.ZoneOffset @@ -549,4 +550,11 @@ class Spark35Shims extends SparkShims { override def unsetOperatorId(plan: QueryPlan[_]): Unit = { QueryPlan.localIdMap.get().remove(plan) } + + override def isParquetFileEncrypted( + fileStatus: LocatedFileStatus, + conf: Configuration): Boolean = { + // TODO: Support will be added (https://github.com/apache/incubator-gluten/pull/8501) + return false + } }