From 1e48d5c26b81fcfaaa92ff9066ce751b3c52136f Mon Sep 17 00:00:00 2001 From: CHENXCHEN Date: Thu, 24 Feb 2022 16:35:00 +0800 Subject: [PATCH 1/3] [SPARK-32432][SQL] Added support for reading ORC/Parquet files with SymlinkTextInputFormat --- .../datasources/CatalogFileIndex.scala | 17 ++- .../PartitioningAwareFileIndex.scala | 11 +- .../SymlinkTextInputFormatUtil.scala | 63 ++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 27 +++- .../data/files/symlink_table/csv-part-1.csv | 1 + .../data/files/symlink_table/csv-part-2.csv | 2 + .../data/files/symlink_table/json-part-1.json | 1 + .../data/files/symlink_table/json-part-2.json | 2 + .../files/symlink_table/orc-part-1.snappy.orc | Bin 0 -> 508 bytes .../files/symlink_table/orc-part-2.snappy.orc | Bin 0 -> 544 bytes .../parquet-part-1.snappy.parquet | Bin 0 -> 948 bytes .../parquet-part-2.snappy.parquet | Bin 0 -> 952 bytes .../apache/spark/sql/hive/SymlinkSuite.scala | 138 ++++++++++++++++++ 13 files changed, 253 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet create mode 100644 sql/hive/src/test/resources/data/files/symlink_table/parquet-part-2.snappy.parquet create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 727b33018fbcf..50b3e91164bc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -72,12 +72,21 @@ class CatalogFileIndex( val startTime = System.nanoTime() val selectedPartitions = ExternalCatalogUtils.listPartitionsByFilter( sparkSession.sessionState.conf, sparkSession.sessionState.catalog, table, filters) - val partitions = selectedPartitions.map { p => + val isSymlinkTextFormat = SymlinkTextInputFormatUtil.isSymlinkTextFormat(table) + val partitions = selectedPartitions.flatMap { p => val path = new Path(p.location) val fs = path.getFileSystem(hadoopConf) - PartitionPath( - p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), - path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + if (isSymlinkTextFormat) { + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path).map { targetPath => + PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + targetPath.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } + } else { + Some(PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + path.makeQualified(fs.getUri, fs.getWorkingDirectory))) + } } val partitionSpec = PartitionSpec(partitionSchema, partitions) val timeNs = System.nanoTime() - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index d70c4b11bc0d7..57893601568b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -122,7 +122,16 @@ abstract class PartitioningAwareFileIndex( // Directory does not exist, or has no children files Nil } - PartitionDirectory(values, files) + // Check leaf files since they might be symlink targets + if (files.isEmpty) { + val status: Seq[FileStatus] = leafFiles.get(path) match { + case Some(existingFile) if isNonEmptyFile(existingFile) => Seq(existingFile) + case _ => Nil + } + PartitionDirectory(values, status) + } else { + PartitionDirectory(values, files) + } } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala new file mode 100644 index 0000000000000..b9febe9cbab19 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{BufferedReader, InputStreamReader} +import java.nio.charset.StandardCharsets.UTF_8 + +import com.google.common.io.CharStreams +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.catalog.CatalogTable + +object SymlinkTextInputFormatUtil { + + def isSymlinkTextFormat(inputFormat: String): Boolean = { + inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") + } + + def isSymlinkTextFormat(catalogTable: CatalogTable): Boolean = { + catalogTable.storage.inputFormat.exists(isSymlinkTextFormat) + } + + // Mostly copied from BackgroundHiveSplitLoader#getTargetPathsFromSymlink of trino(prestosql) + def getTargetPathsFromSymlink( + fileSystem: FileSystem, + symlinkDir: Path): Seq[Path] = { + + val symlinks = fileSystem.listStatus(symlinkDir, new PathFilter() { + override def accept(p: Path): Boolean = DataSourceUtils.isDataPath(p) + }) + + symlinks.flatMap { + case fileStatus if fileStatus.isFile => + val reader = new BufferedReader( + new InputStreamReader(fileSystem.open(fileStatus.getPath), UTF_8)) + try { + CharStreams.readLines(reader).asScala + .map(symlinkStr => new Path(symlinkStr)) + } finally { + reader.close() + Seq.empty + } + case _ => + Seq.empty + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b6f06f5989d2f..b59f798b56d65 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -204,10 +204,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log (options, None) } + val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + val isSymlinkTextFormat = SymlinkTextInputFormatUtil.isSymlinkTextFormat(relation.tableMeta) + + val symlinkTargets = if (isSymlinkTextFormat) { + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath) + } else { + Nil + } + val result = if (relation.isPartitioned) { val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { - Seq(tablePath) + if (isSymlinkTextFormat) { + symlinkTargets + } else { + Seq(tablePath) + } } else { // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. @@ -220,6 +233,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (paths.isEmpty) { Seq(tablePath) + } else if (isSymlinkTextFormat) { + paths.flatMap(path => SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path)) } else { paths } @@ -264,11 +279,15 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } else { - val rootPath = tablePath + val rootPaths = if (isSymlinkTextFormat) { + symlinkTargets + } else { + Seq(tablePath) + } withTableCreationLock(tableIdentifier, { val cached = getCached( tableIdentifier, - Seq(rootPath), + rootPaths, metastoreSchema, fileFormatClass, None) @@ -278,7 +297,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log LogicalRelation( DataSource( sparkSession = sparkSession, - paths = rootPath.toString :: Nil, + paths = rootPaths.map(_.toString), userSpecifiedSchema = Option(updatedTable.dataSchema), bucketSpec = hiveBucketSpec, // Do not interpret the 'path' option at all when tables are read using the Hive diff --git a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv new file mode 100644 index 0000000000000..eac3d00ebe14c --- /dev/null +++ b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv @@ -0,0 +1 @@ +Spark,3.2,1 diff --git a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv new file mode 100644 index 0000000000000..4984c9ce35e9f --- /dev/null +++ b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv @@ -0,0 +1,2 @@ +Hive,2.3,3 +Trino,371.0,2 diff --git a/sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json b/sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json new file mode 100644 index 0000000000000..d03c653dbc87b --- /dev/null +++ b/sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json @@ -0,0 +1 @@ +{"name": "Spark", "version": 3.2, "sort": 1} \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json b/sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json new file mode 100644 index 0000000000000..8f99bf3cd967e --- /dev/null +++ b/sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json @@ -0,0 +1,2 @@ +{"name": "Hive", "version": 2.3, "sort": 3} +{"name": "Trino", "version": 371, "sort": 2} diff --git a/sql/hive/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc b/sql/hive/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc new file mode 100644 index 0000000000000000000000000000000000000000..122b059e15eeb847d40d9c1d4e686f8ffd59b230 GIT binary patch literal 508 zcmZvY&q~8U5XNVAn`E;sC5a&0Sg4H!qe6)EV9BXikRqiGRj~&zQHuYtDXH{a#0L-$ zLi;4%eGcEjgAd@QQYpoO*>Bm|o&9~&={8INu38y}AvEN+#1%-xT4(4OTsav=$-VU} zf`@Pnuv6pPoX9I>hW3U&n<6s3ZkoDVm`&5r>mlsPUTd7DopKB3_Gcp?uDVe$$P7P-n)14LF~3`Iq*M7RoTQXymo4HQf&k*X?2h6waSG^kK5!aycj z=o`TKDL>MdEx^87M(PrXC1lr4gMp2PDz6m()vrg2|c`2_PqnI%HB~rda5cDfDm E16D6ffdBvi literal 0 HcmV?d00001 diff --git a/sql/hive/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc b/sql/hive/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc new file mode 100644 index 0000000000000000000000000000000000000000..850d406e036771561f5c5e05cee2bea545b23b4a GIT binary patch literal 544 zcmZ`$!AiqG5S^VQ+wB@OZV=a4#I^^kg;1#=^b{#dL9n6p5RX+`En-_!saNp}{1I<{ zfQU!^5l)16&;v;3vYP5H&bs^=K&;Froa$(=({2<7?)ZQn46PktvhNfs~LBD zeXT+W0oJ$Vnnaq&3iOuK9gEo*%mZUvrR!kn^@XZ=9fVLMHCGyAEnA@C!gRQnA-Diz z(L^!@u*Y_@0G*^TplJZoNr;m>fG{Ju;11{+O9-a(^r9|WqGo7K40D*a36YRn4smY6 zafJe?afd${KOD5;bAOcf zAC*=1%YLbZmBe@DKi`$9Z-!r|c=&mW$gTWx9o!w%AYw<>roZi%#WQnI-`qGk1iNrp MO5c7MM!iwhpIm!hAOHXW literal 0 HcmV?d00001 diff --git a/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet b/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6c234fca657e5d06b6b8a5764a80aef2a68e2636 GIT binary patch literal 948 zcmah|ON-M`6u!BA7)xOkdM>#Tqy!pfpoTV?X*;D1IxCSbif#%blBTz{m_8;+6)D}$ zPY~?Vh4VLNp*xr3&iN6#b>qpSZ9v2ja?ZKu`@Vb5xi`JX&+0(n0cdc0b^Y}~1&U!F zWI_&Fln|1Ua`=6|u)Q~!xCbTpeEZ?M6yxL&C##!};dA#90%Rz{r>oyT3n@UhD4_(u zPq~tdgtitaXGucdYH(%erqbC+L5WPVtTGvTNGDuneK=3NjRft`MNs!i(GSah7Al=bIcKp5pJ@944$MSeo$JdbHW8D(A8 z)8$UmN@5r}j9W`$EDDE#iiEIV6iVzy0+Z-{6$-pUWWL)pQaoeldcU)8eD9&|ZXEm7 zwEATR)h{WW5q%S$N-Q6B5otGA8xx&)bnzEOqd~?|T5OX6#=cN62)mXT)n)cgD&d5y ziZb`chP|+HltwgFBbl0#vkc3q5#{JFgn!{0cmNkm5g6Y5oiP=GJ+uQ``5~>e;Qc~q zZOsq7r4wjXZ8QZW0q4+qPMJ1jSw w!m2r~+Nou=8p1g-9m^Tn4aYhjG{wlW%(`Wo>^gTVF2C@7QL2qh(t>oHuuDacHW;W8_vY(b{kymW<=8Pf5HiOXd%0WdLUWz!0k z$O+m(R_Ks)vZu=&Ifx%obaYq@CwB1cY3L03$R=w*iWPy0zCGdF2?!boe3je~)1Pi)LiPh!-wm{`HoO}!3m z^Djks>gkyH>6Ax$Fh18uJhFRsWV5finHoKxaJ8+5Q80BPwWju6?)5^o{b;7f(c)5a ztE~uKe}F=N>Unn8Lj_uSO}(P3?b&=@Te!3G(2%{UH=f3>RY2eN!haLuFTb?Ae#8en z_^pP>MoYhfo=nX?R%GK`F^?K!2i<@<4r@m{NJ5BCv8;>mnv*7g7Z literal 0 HcmV?d00001 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala new file mode 100644 index 0000000000000..6d184d6e272f0 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import java.io.{FileWriter, PrintWriter} + +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + + +class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { + + import spark.implicits._ + + private val tuples: Seq[(String, Double, Int)] = Seq( + ("Spark", 3.2, 1) + , ("Hive", 2.3, 3) + , ("Trino", 371, 2) + ) + lazy val answerNoPartition: DataFrame = tuples.toDF("name", "version", "sort") + + private val tuplesWithPartition: Seq[(String, Double, Int, String)] = Seq( + ("Spark", 3.2, 1, "2022-02-21") + , ("Hive", 2.3, 3, "2022-02-19") + , ("Trino", 371, 2, "2022-02-20") + ) + lazy val answerWithPartition: DataFrame = + tuplesWithPartition.toDF("name", "version", "sort", "pt") + + val schemaDDL = "name STRING, version DOUBLE, sort INT" + val partitionSchemaDDL = "pt STRING" + + lazy val userDir: String = System.getProperty("user.dir") + lazy val csvFiles = Seq( + s"$userDir/src/test/resources/data/files/symlink_table/csv-part-1.csv", + s"$userDir/src/test/resources/data/files/symlink_table/csv-part-2.csv") + + lazy val jsonFiles = Seq( + s"$userDir/src/test/resources/data/files/symlink_table/json-part-1.json", + s"$userDir/src/test/resources/data/files/symlink_table/json-part-2.json") + + lazy val orcFiles = Seq( + s"$userDir/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc", + s"$userDir/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc") + + lazy val parquetFiles = Seq( + s"$userDir/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet", + s"$userDir/src/test/resources/data/files/symlink_table/parquet-part-2.snappy.parquet") + + def writeContentToPath(content: String, manifestPath: String): Unit = { + val writer = new PrintWriter(new FileWriter(manifestPath, false)) + writer.write(content) + writer.close() + } + + def transformFilePath(files: Seq[String]): String = { + files.map(file => s"file://$file").mkString("\n") + "\n" + } + + case class SymlinkTable(tableName: String, rowFormat: String, files: Seq[String], ddl: String) + + private val orcSymlinkTable: SymlinkTable = + SymlinkTable("symlink_orc", + "SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'", orcFiles, schemaDDL) + private val parquetSymlinkTable: SymlinkTable = SymlinkTable("symlink_parquet" + , "SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'" + , parquetFiles, schemaDDL) + private val csvSymlinkTable: SymlinkTable = SymlinkTable("symlink_csv" + , "DELIMITED FIELDS TERMINATED BY ','", csvFiles, schemaDDL) + + test("query symlink table with orc format") { + checkSymlinkTable(orcSymlinkTable) + } + + test("query symlink table with paruqet format") { + checkSymlinkTable(parquetSymlinkTable) + } + + // csv/json format not work at Spark 3.2, don't known why + // test("query symlink table with csv format") { + // checkSymlinkTable(csvSymlinkTable) + // } + // + // test("query symlink table with json format") { + // checkSymlinkTable(jsonSymlinkTable) + // } + + def checkSymlinkTable(symlinkTable: SymlinkTable, isPartition: Boolean = false): Unit = { + val tableName = symlinkTable.tableName + val schemaDDL = symlinkTable.ddl + withTempDir { temp => + withTable(tableName) { + // generate manifest for symlink table + if (isPartition) { + + } else { + val manifest = s"${temp.getCanonicalPath}/manifest" + writeContentToPath(transformFilePath(symlinkTable.files), + manifest) + } + + // drop table if exists + spark.sql(s"DROP TABLE IF EXISTS $tableName") + + // create external table + spark.sql( + s"""CREATE EXTERNAL TABLE $tableName ( $schemaDDL ) + |ROW FORMAT ${symlinkTable.rowFormat} + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + |LOCATION 'file://${temp.getCanonicalPath}' + |""".stripMargin + ) + + // read table and compared result + val df = spark.sql(s"select * from $tableName") + df.printSchema() + df.show() + checkAnswer(df, answerNoPartition) + } + } + } +} From 119109df31a363a8f6bbbc7857bc794085bc1619 Mon Sep 17 00:00:00 2001 From: CHENXCHEN Date: Wed, 2 Mar 2022 18:45:37 +0800 Subject: [PATCH 2/3] add analyze symlink table fix and Suite case --- .../command/AnalyzePartitionCommand.scala | 14 +- .../sql/execution/command/CommandUtils.scala | 37 +++- .../SymlinkTextInputFormatUtil.scala | 36 +++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../data/files/symlink_table/csv-part-1.csv | 2 +- .../data/files/symlink_table/csv-part-2.csv | 2 +- .../data/files/symlink_table/json-part-1.json | 1 - .../data/files/symlink_table/json-part-2.json | 2 - .../apache/spark/sql/hive/SymlinkSuite.scala | 172 ++++++++++++++---- 9 files changed, 219 insertions(+), 49 deletions(-) delete mode 100644 sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json delete mode 100644 sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 38d92ba752ad7..9935ab46b68c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -17,13 +17,16 @@ package org.apache.spark.sql.execution.command +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.{Column, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.SymlinkTextInputFormatUtil import org.apache.spark.sql.util.PartitioningUtils /** @@ -103,11 +106,14 @@ case class AnalyzePartitionCommand( calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec) } + // get real symlink table size + val isSymlinkTable = SymlinkTextInputFormatUtil.isSymlinkTextFormat(tableMeta) + lazy val fs = new Path(tableMeta.location) + .getFileSystem(sparkSession.sparkContext.hadoopConfiguration) // Update the metastore if newly computed statistics are different from those // recorded in the metastore. - - val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier, - partitions.map(_.storage.locationUri)) + val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier + , CommandUtils.getPartitionPaths(partitions, isSymlinkTable, fs)) val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) => val newRowCount = rowCounts.get(p.spec) val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 312f17543ce26..3055ce9aca4d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -25,16 +25,16 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex, SymlinkTextInputFormatUtil} import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.types._ @@ -74,21 +74,44 @@ object CommandUtils extends Logging { def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { val sessionState = spark.sessionState val startTime = System.nanoTime() + + val isSymlinkTable = SymlinkTextInputFormatUtil.isSymlinkTextFormat(catalogTable) + lazy val fs = new Path(catalogTable.location) + .getFileSystem(spark.sessionState.newHadoopConf()) val totalSize = if (catalogTable.partitionColumnNames.isEmpty) { - calculateSingleLocationSize(sessionState, catalogTable.identifier, - catalogTable.storage.locationUri) + if (isSymlinkTable) { + calculateMultipleLocationSizes(spark, catalogTable.identifier + , SymlinkTextInputFormatUtil.getSymlinkTableLocationPaths(fs, catalogTable.location) + ).sum + } else { + calculateSingleLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri) + } } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.") - val paths = partitions.map(_.storage.locationUri) - calculateMultipleLocationSizes(spark, catalogTable.identifier, paths).sum + calculateMultipleLocationSizes(spark, catalogTable.identifier, + getPartitionPaths(partitions, isSymlinkTable, fs)).sum } logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" + s" the total size for table ${catalogTable.identifier}.") totalSize } + def getPartitionPaths(partitions: Seq[CatalogTablePartition] + , isSymlinkTable: Boolean, fs: => FileSystem): Seq[Option[URI]] = { + partitions.flatMap { catalogTablePartition => + if (isSymlinkTable) { + catalogTablePartition.storage.locationUri + .map(SymlinkTextInputFormatUtil.getSymlinkTableLocationPaths(fs, _)) + .getOrElse(Seq.empty) + } else { + Seq(catalogTablePartition.storage.locationUri) + } + } + } + def calculateSingleLocationSize( sessionState: SessionState, identifier: TableIdentifier, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala index b9febe9cbab19..a9cf87cedf386 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources import java.io.{BufferedReader, InputStreamReader} +import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 import com.google.common.io.CharStreams @@ -28,15 +29,35 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable object SymlinkTextInputFormatUtil { + /** + * Determine if InputFormat is SymlinkTable + * + * @param inputFormat Table InputFormat + * @return + */ def isSymlinkTextFormat(inputFormat: String): Boolean = { inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") } + /** + * Determine CatalogTable is SymlinkTable + * + * @param catalogTable CatalogTable + * @return + */ def isSymlinkTextFormat(catalogTable: CatalogTable): Boolean = { catalogTable.storage.inputFormat.exists(isSymlinkTextFormat) } - // Mostly copied from BackgroundHiveSplitLoader#getTargetPathsFromSymlink of trino(prestosql) + /** + * Get symlink files from target path + * Mostly copied from BackgroundHiveSplitLoader#getTargetPathsFromSymlink of trino(prestosql) + * compatible with hive SymlinkTextInputFormat#getTargetPathsFromSymlinksDirs + * + * @param fileSystem filesystem + * @param symlinkDir symlink table location + * @return + */ def getTargetPathsFromSymlink( fileSystem: FileSystem, symlinkDir: Path): Seq[Path] = { @@ -60,4 +81,17 @@ object SymlinkTextInputFormatUtil { Seq.empty } } + + /** + * Get symlink uris from target path + * + * @param fileSystem filesystem + * @param location symlink table location + * @return + */ + def getSymlinkTableLocationPaths(fileSystem: FileSystem, location: URI): Seq[Option[URI]] = { + SymlinkTextInputFormatUtil + .getTargetPathsFromSymlink(fileSystem, new Path(location)) + .map(path => Option(path.toUri)) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b59f798b56d65..58f632bc78063 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -204,7 +204,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log (options, None) } - val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + lazy val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) val isSymlinkTextFormat = SymlinkTextInputFormatUtil.isSymlinkTextFormat(relation.tableMeta) val symlinkTargets = if (isSymlinkTextFormat) { diff --git a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv index eac3d00ebe14c..27b0f5615a05c 100644 --- a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv +++ b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv @@ -1 +1 @@ -Spark,3.2,1 +Spark,3.2,1 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv index 4984c9ce35e9f..f1e79145cbb59 100644 --- a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv +++ b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv @@ -1,2 +1,2 @@ -Hive,2.3,3 Trino,371.0,2 +Hive,2.3,3 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json b/sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json deleted file mode 100644 index d03c653dbc87b..0000000000000 --- a/sql/hive/src/test/resources/data/files/symlink_table/json-part-1.json +++ /dev/null @@ -1 +0,0 @@ -{"name": "Spark", "version": 3.2, "sort": 1} \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json b/sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json deleted file mode 100644 index 8f99bf3cd967e..0000000000000 --- a/sql/hive/src/test/resources/data/files/symlink_table/json-part-2.json +++ /dev/null @@ -1,2 +0,0 @@ -{"name": "Hive", "version": 2.3, "sort": 3} -{"name": "Trino", "version": 371, "sort": 2} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala index 6d184d6e272f0..3ade887e8672e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala @@ -16,17 +16,26 @@ */ package org.apache.spark.sql.hive -import java.io.{FileWriter, PrintWriter} +import java.io.{File, FileWriter, PrintWriter} +import org.apache.spark.internal.config import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AnalyzeTableCommand} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.DataTypes class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { import spark.implicits._ + // SymlinkTextInputFormat table will get file splits size is empty + // see SymlinkTextInputSplit.SymlinkTextInputSplit at hive 2.3 + spark.sparkContext.conf.set(config.HADOOP_RDD_IGNORE_EMPTY_SPLITS.key, "false") + private val tuples: Seq[(String, Double, Int)] = Seq( ("Spark", 3.2, 1) , ("Hive", 2.3, 3) @@ -35,9 +44,9 @@ class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { lazy val answerNoPartition: DataFrame = tuples.toDF("name", "version", "sort") private val tuplesWithPartition: Seq[(String, Double, Int, String)] = Seq( - ("Spark", 3.2, 1, "2022-02-21") + ("Spark", 3.2, 1, "2022-02-20") , ("Hive", 2.3, 3, "2022-02-19") - , ("Trino", 371, 2, "2022-02-20") + , ("Trino", 371, 2, "2022-02-19") ) lazy val answerWithPartition: DataFrame = tuplesWithPartition.toDF("name", "version", "sort", "pt") @@ -50,10 +59,6 @@ class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { s"$userDir/src/test/resources/data/files/symlink_table/csv-part-1.csv", s"$userDir/src/test/resources/data/files/symlink_table/csv-part-2.csv") - lazy val jsonFiles = Seq( - s"$userDir/src/test/resources/data/files/symlink_table/json-part-1.json", - s"$userDir/src/test/resources/data/files/symlink_table/json-part-2.json") - lazy val orcFiles = Seq( s"$userDir/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc", s"$userDir/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc") @@ -63,6 +68,7 @@ class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { s"$userDir/src/test/resources/data/files/symlink_table/parquet-part-2.snappy.parquet") def writeContentToPath(content: String, manifestPath: String): Unit = { + new File(manifestPath).getParentFile.mkdir() val writer = new PrintWriter(new FileWriter(manifestPath, false)) writer.write(content) writer.close() @@ -72,46 +78,102 @@ class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { files.map(file => s"file://$file").mkString("\n") + "\n" } - case class SymlinkTable(tableName: String, rowFormat: String, files: Seq[String], ddl: String) + case class SymlinkTable(tableName: String, rowFormat: String, files: Seq[String], ddl: String + , size: Long) private val orcSymlinkTable: SymlinkTable = SymlinkTable("symlink_orc", - "SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'", orcFiles, schemaDDL) + "SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'", orcFiles, schemaDDL, 1052) private val parquetSymlinkTable: SymlinkTable = SymlinkTable("symlink_parquet" , "SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'" - , parquetFiles, schemaDDL) + , parquetFiles, schemaDDL, 1900) private val csvSymlinkTable: SymlinkTable = SymlinkTable("symlink_csv" - , "DELIMITED FIELDS TERMINATED BY ','", csvFiles, schemaDDL) + , "SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'" + , csvFiles, schemaDDL, 35) + + // query symlink table with csv format and check answer + test("[csv][symlink table][data]query check") { + createSymlinkTable(csvSymlinkTable)(checkTableAnswer) + } + + // analyze symlink table with csv format and check size in bytes + test("[csv][symlink table][size in bytes]analyze check") { + createSymlinkTable(csvSymlinkTable)(checkTableSize) + } + + // query symlink partition table with orc format and check answer + test("[csv][symlink table][partition][data]query check") { + createSymlinkTable(csvSymlinkTable, isPartition = true)(checkTableAnswer) + } + + // analyze symlink partition table with orc format and check size in bytes + test("[csv][symlink table][partition][size in bytes]analyze check") { + createSymlinkTable(csvSymlinkTable, isPartition = true)(checkTableSize) + } + + // query symlink table with orc format and check answer + test("[orc][symlink table][data]query check") { + createSymlinkTable(orcSymlinkTable)(checkTableAnswer) + } + + // analyze symlink table with orc format and check size in bytes + test("[orc][symlink table][size in bytes]analyze check") { + createSymlinkTable(orcSymlinkTable)(checkTableSize) + } + + + // query symlink partition table with orc format and check answer + test("[orc][symlink table][partition][data]query check") { + createSymlinkTable(orcSymlinkTable, isPartition = true)(checkTableAnswer) + } + + // analyze symlink partition table with orc format and check size in bytes + test("[orc][symlink table][partition][size in bytes]analyze check") { + createSymlinkTable(orcSymlinkTable, isPartition = true)(checkTableSize) + } + + + // query symlink table with parquet format and check answer + test("[parquet][symlink table][data]query check") { + createSymlinkTable(parquetSymlinkTable)(checkTableAnswer) + } - test("query symlink table with orc format") { - checkSymlinkTable(orcSymlinkTable) + // analyze symlink table with parquet format and check size in bytes + test("[parquet][symlink table][size in bytes]analyze check") { + createSymlinkTable(parquetSymlinkTable)(checkTableSize) } - test("query symlink table with paruqet format") { - checkSymlinkTable(parquetSymlinkTable) + + // query parquet partition table with orc format and check answer + test("[parquet][symlink partition table][data]query check") { + createSymlinkTable(parquetSymlinkTable, isPartition = true)(checkTableAnswer) } - // csv/json format not work at Spark 3.2, don't known why - // test("query symlink table with csv format") { - // checkSymlinkTable(csvSymlinkTable) - // } - // - // test("query symlink table with json format") { - // checkSymlinkTable(jsonSymlinkTable) - // } + // analyze parquet partition table with orc format and check size in bytes + test("[parquet][symlink partition table][size in bytes]analyze check") { + createSymlinkTable(parquetSymlinkTable, isPartition = true)(checkTableSize) + } - def checkSymlinkTable(symlinkTable: SymlinkTable, isPartition: Boolean = false): Unit = { + def createSymlinkTable(symlinkTable: SymlinkTable, isPartition: Boolean = false) + (fuc: (SymlinkTable, Boolean) => Unit): Unit = { val tableName = symlinkTable.tableName val schemaDDL = symlinkTable.ddl withTempDir { temp => withTable(tableName) { // generate manifest for symlink table - if (isPartition) { - + val partitionDDL = if (isPartition) { + val manifest1 = s"${temp.getCanonicalPath}/pt=2022-02-20/manifest" + writeContentToPath(transformFilePath(symlinkTable.files.headOption.toSeq) + , manifest1) + val manifest2 = s"${temp.getCanonicalPath}/pt=2022-02-19/manifest" + writeContentToPath(transformFilePath(symlinkTable.files.lastOption.toSeq) + , manifest2) + s"PARTITIONED BY ($partitionSchemaDDL)" } else { val manifest = s"${temp.getCanonicalPath}/manifest" writeContentToPath(transformFilePath(symlinkTable.files), manifest) + "" } // drop table if exists @@ -120,6 +182,7 @@ class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { // create external table spark.sql( s"""CREATE EXTERNAL TABLE $tableName ( $schemaDDL ) + |$partitionDDL |ROW FORMAT ${symlinkTable.rowFormat} |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' @@ -127,12 +190,59 @@ class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { |""".stripMargin ) - // read table and compared result - val df = spark.sql(s"select * from $tableName") - df.printSchema() - df.show() - checkAnswer(df, answerNoPartition) + // if this is partition, we should add partition for test + if (isPartition) { + AlterTableAddPartitionCommand(TableIdentifier(tableName), + Seq( + (Map("pt" -> "2022-02-20"), None), + (Map("pt" -> "2022-02-19"), None) + ), + ifNotExists = true).run(spark) + } + + fuc(symlinkTable, isPartition) } } } + + def castDF(df: DataFrame): DataFrame = { + df.withColumn("version", 'version.cast(DataTypes.DoubleType)) + .withColumn("sort", 'sort.cast(DataTypes.IntegerType)) + } + + def checkTableAnswer(symlinkTable: SymlinkTable, isPartition: Boolean = false): Unit = { + // read table and compared result + val tableName = symlinkTable.tableName + var df = spark.sql(s"select * from $tableName") + df.printSchema() + df.show() + // spark will transform all columns to string when using OpenCSVSerde for row format + if (symlinkTable.rowFormat.contains("OpenCSVSerde")) { + df = castDF(df) + } + if (isPartition) { + checkAnswer(df, answerWithPartition) + } else { + checkAnswer(df, answerNoPartition) + } + } + + def checkTableSize(symlinkTable: SymlinkTable, isPartition: Boolean = false): Unit = { + val tableName = symlinkTable.tableName + analyzeTable(tableName) + // scalastyle:off println + val stats = getCatalogTable(tableName).stats + println(tableName, stats) + assert(stats.get.sizeInBytes == symlinkTable.size) + } + + + def analyzeTable(tbl: String, db: Option[String] = Option.empty, noscan: Boolean = true): Unit = { + AnalyzeTableCommand(TableIdentifier(tbl, db), noscan) + .run(spark) + } + + def getCatalogTable(tableName: String): CatalogTable = { + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + } } From e725b16d5d2a17277a32c277a826ecc812e38bc7 Mon Sep 17 00:00:00 2001 From: CHENXCHEN Date: Tue, 8 Mar 2022 10:32:39 +0800 Subject: [PATCH 3/3] format code --- .../apache/spark/sql/execution/command/CommandUtils.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 3055ce9aca4d1..622b9801ad83d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -99,8 +99,10 @@ object CommandUtils extends Logging { totalSize } - def getPartitionPaths(partitions: Seq[CatalogTablePartition] - , isSymlinkTable: Boolean, fs: => FileSystem): Seq[Option[URI]] = { + def getPartitionPaths( + partitions: Seq[CatalogTablePartition], + isSymlinkTable: Boolean, + fs: => FileSystem): Seq[Option[URI]] = { partitions.flatMap { catalogTablePartition => if (isSymlinkTable) { catalogTablePartition.storage.locationUri