diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 38d92ba752ad7..9935ab46b68c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -17,13 +17,16 @@ package org.apache.spark.sql.execution.command +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.{Column, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.SymlinkTextInputFormatUtil import org.apache.spark.sql.util.PartitioningUtils /** @@ -103,11 +106,14 @@ case class AnalyzePartitionCommand( calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec) } + // get real symlink table size + val isSymlinkTable = SymlinkTextInputFormatUtil.isSymlinkTextFormat(tableMeta) + lazy val fs = new Path(tableMeta.location) + .getFileSystem(sparkSession.sparkContext.hadoopConfiguration) // Update the metastore if newly computed statistics are different from those // recorded in the metastore. - - val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier, - partitions.map(_.storage.locationUri)) + val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier + , CommandUtils.getPartitionPaths(partitions, isSymlinkTable, fs)) val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) => val newRowCount = rowCounts.get(p.spec) val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 312f17543ce26..622b9801ad83d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -25,16 +25,16 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex, SymlinkTextInputFormatUtil} import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.types._ @@ -74,21 +74,46 @@ object CommandUtils extends Logging { def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { val sessionState = spark.sessionState val startTime = System.nanoTime() + + val isSymlinkTable = SymlinkTextInputFormatUtil.isSymlinkTextFormat(catalogTable) + lazy val fs = new Path(catalogTable.location) + .getFileSystem(spark.sessionState.newHadoopConf()) val totalSize = if (catalogTable.partitionColumnNames.isEmpty) { - calculateSingleLocationSize(sessionState, catalogTable.identifier, - catalogTable.storage.locationUri) + if (isSymlinkTable) { + calculateMultipleLocationSizes(spark, catalogTable.identifier + , SymlinkTextInputFormatUtil.getSymlinkTableLocationPaths(fs, catalogTable.location) + ).sum + } else { + calculateSingleLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri) + } } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.") - val paths = partitions.map(_.storage.locationUri) - calculateMultipleLocationSizes(spark, catalogTable.identifier, paths).sum + calculateMultipleLocationSizes(spark, catalogTable.identifier, + getPartitionPaths(partitions, isSymlinkTable, fs)).sum } logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" + s" the total size for table ${catalogTable.identifier}.") totalSize } + def getPartitionPaths( + partitions: Seq[CatalogTablePartition], + isSymlinkTable: Boolean, + fs: => FileSystem): Seq[Option[URI]] = { + partitions.flatMap { catalogTablePartition => + if (isSymlinkTable) { + catalogTablePartition.storage.locationUri + .map(SymlinkTextInputFormatUtil.getSymlinkTableLocationPaths(fs, _)) + .getOrElse(Seq.empty) + } else { + Seq(catalogTablePartition.storage.locationUri) + } + } + } + def calculateSingleLocationSize( sessionState: SessionState, identifier: TableIdentifier, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 727b33018fbcf..50b3e91164bc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -72,12 +72,21 @@ class CatalogFileIndex( val startTime = System.nanoTime() val selectedPartitions = ExternalCatalogUtils.listPartitionsByFilter( sparkSession.sessionState.conf, sparkSession.sessionState.catalog, table, filters) - val partitions = selectedPartitions.map { p => + val isSymlinkTextFormat = SymlinkTextInputFormatUtil.isSymlinkTextFormat(table) + val partitions = selectedPartitions.flatMap { p => val path = new Path(p.location) val fs = path.getFileSystem(hadoopConf) - PartitionPath( - p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), - path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + if (isSymlinkTextFormat) { + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path).map { targetPath => + PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + targetPath.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } + } else { + Some(PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + path.makeQualified(fs.getUri, fs.getWorkingDirectory))) + } } val partitionSpec = PartitionSpec(partitionSchema, partitions) val timeNs = System.nanoTime() - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index d70c4b11bc0d7..57893601568b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -122,7 +122,16 @@ abstract class PartitioningAwareFileIndex( // Directory does not exist, or has no children files Nil } - PartitionDirectory(values, files) + // Check leaf files since they might be symlink targets + if (files.isEmpty) { + val status: Seq[FileStatus] = leafFiles.get(path) match { + case Some(existingFile) if isNonEmptyFile(existingFile) => Seq(existingFile) + case _ => Nil + } + PartitionDirectory(values, status) + } else { + PartitionDirectory(values, files) + } } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala new file mode 100644 index 0000000000000..a9cf87cedf386 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{BufferedReader, InputStreamReader} +import java.net.URI +import java.nio.charset.StandardCharsets.UTF_8 + +import com.google.common.io.CharStreams +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.catalog.CatalogTable + +object SymlinkTextInputFormatUtil { + + /** + * Determine if InputFormat is SymlinkTable + * + * @param inputFormat Table InputFormat + * @return + */ + def isSymlinkTextFormat(inputFormat: String): Boolean = { + inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") + } + + /** + * Determine CatalogTable is SymlinkTable + * + * @param catalogTable CatalogTable + * @return + */ + def isSymlinkTextFormat(catalogTable: CatalogTable): Boolean = { + catalogTable.storage.inputFormat.exists(isSymlinkTextFormat) + } + + /** + * Get symlink files from target path + * Mostly copied from BackgroundHiveSplitLoader#getTargetPathsFromSymlink of trino(prestosql) + * compatible with hive SymlinkTextInputFormat#getTargetPathsFromSymlinksDirs + * + * @param fileSystem filesystem + * @param symlinkDir symlink table location + * @return + */ + def getTargetPathsFromSymlink( + fileSystem: FileSystem, + symlinkDir: Path): Seq[Path] = { + + val symlinks = fileSystem.listStatus(symlinkDir, new PathFilter() { + override def accept(p: Path): Boolean = DataSourceUtils.isDataPath(p) + }) + + symlinks.flatMap { + case fileStatus if fileStatus.isFile => + val reader = new BufferedReader( + new InputStreamReader(fileSystem.open(fileStatus.getPath), UTF_8)) + try { + CharStreams.readLines(reader).asScala + .map(symlinkStr => new Path(symlinkStr)) + } finally { + reader.close() + Seq.empty + } + case _ => + Seq.empty + } + } + + /** + * Get symlink uris from target path + * + * @param fileSystem filesystem + * @param location symlink table location + * @return + */ + def getSymlinkTableLocationPaths(fileSystem: FileSystem, location: URI): Seq[Option[URI]] = { + SymlinkTextInputFormatUtil + .getTargetPathsFromSymlink(fileSystem, new Path(location)) + .map(path => Option(path.toUri)) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b6f06f5989d2f..58f632bc78063 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -204,10 +204,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log (options, None) } + lazy val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + val isSymlinkTextFormat = SymlinkTextInputFormatUtil.isSymlinkTextFormat(relation.tableMeta) + + val symlinkTargets = if (isSymlinkTextFormat) { + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath) + } else { + Nil + } + val result = if (relation.isPartitioned) { val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { - Seq(tablePath) + if (isSymlinkTextFormat) { + symlinkTargets + } else { + Seq(tablePath) + } } else { // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. @@ -220,6 +233,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (paths.isEmpty) { Seq(tablePath) + } else if (isSymlinkTextFormat) { + paths.flatMap(path => SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path)) } else { paths } @@ -264,11 +279,15 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } else { - val rootPath = tablePath + val rootPaths = if (isSymlinkTextFormat) { + symlinkTargets + } else { + Seq(tablePath) + } withTableCreationLock(tableIdentifier, { val cached = getCached( tableIdentifier, - Seq(rootPath), + rootPaths, metastoreSchema, fileFormatClass, None) @@ -278,7 +297,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log LogicalRelation( DataSource( sparkSession = sparkSession, - paths = rootPath.toString :: Nil, + paths = rootPaths.map(_.toString), userSpecifiedSchema = Option(updatedTable.dataSchema), bucketSpec = hiveBucketSpec, // Do not interpret the 'path' option at all when tables are read using the Hive diff --git a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv new file mode 100644 index 0000000000000..27b0f5615a05c --- /dev/null +++ b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-1.csv @@ -0,0 +1 @@ +Spark,3.2,1 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv new file mode 100644 index 0000000000000..f1e79145cbb59 --- /dev/null +++ b/sql/hive/src/test/resources/data/files/symlink_table/csv-part-2.csv @@ -0,0 +1,2 @@ +Trino,371.0,2 +Hive,2.3,3 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc b/sql/hive/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc new file mode 100644 index 0000000000000..122b059e15eeb Binary files /dev/null and b/sql/hive/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc differ diff --git a/sql/hive/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc b/sql/hive/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc new file mode 100644 index 0000000000000..850d406e03677 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc differ diff --git a/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet b/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet new file mode 100644 index 0000000000000..6c234fca657e5 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet differ diff --git a/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-2.snappy.parquet b/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-2.snappy.parquet new file mode 100644 index 0000000000000..57dd7c4eeaf29 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/symlink_table/parquet-part-2.snappy.parquet differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala new file mode 100644 index 0000000000000..3ade887e8672e --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import java.io.{File, FileWriter, PrintWriter} + +import org.apache.spark.internal.config +import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AnalyzeTableCommand} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.DataTypes + + +class SymlinkSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { + + import spark.implicits._ + + // SymlinkTextInputFormat table will get file splits size is empty + // see SymlinkTextInputSplit.SymlinkTextInputSplit at hive 2.3 + spark.sparkContext.conf.set(config.HADOOP_RDD_IGNORE_EMPTY_SPLITS.key, "false") + + private val tuples: Seq[(String, Double, Int)] = Seq( + ("Spark", 3.2, 1) + , ("Hive", 2.3, 3) + , ("Trino", 371, 2) + ) + lazy val answerNoPartition: DataFrame = tuples.toDF("name", "version", "sort") + + private val tuplesWithPartition: Seq[(String, Double, Int, String)] = Seq( + ("Spark", 3.2, 1, "2022-02-20") + , ("Hive", 2.3, 3, "2022-02-19") + , ("Trino", 371, 2, "2022-02-19") + ) + lazy val answerWithPartition: DataFrame = + tuplesWithPartition.toDF("name", "version", "sort", "pt") + + val schemaDDL = "name STRING, version DOUBLE, sort INT" + val partitionSchemaDDL = "pt STRING" + + lazy val userDir: String = System.getProperty("user.dir") + lazy val csvFiles = Seq( + s"$userDir/src/test/resources/data/files/symlink_table/csv-part-1.csv", + s"$userDir/src/test/resources/data/files/symlink_table/csv-part-2.csv") + + lazy val orcFiles = Seq( + s"$userDir/src/test/resources/data/files/symlink_table/orc-part-1.snappy.orc", + s"$userDir/src/test/resources/data/files/symlink_table/orc-part-2.snappy.orc") + + lazy val parquetFiles = Seq( + s"$userDir/src/test/resources/data/files/symlink_table/parquet-part-1.snappy.parquet", + s"$userDir/src/test/resources/data/files/symlink_table/parquet-part-2.snappy.parquet") + + def writeContentToPath(content: String, manifestPath: String): Unit = { + new File(manifestPath).getParentFile.mkdir() + val writer = new PrintWriter(new FileWriter(manifestPath, false)) + writer.write(content) + writer.close() + } + + def transformFilePath(files: Seq[String]): String = { + files.map(file => s"file://$file").mkString("\n") + "\n" + } + + case class SymlinkTable(tableName: String, rowFormat: String, files: Seq[String], ddl: String + , size: Long) + + private val orcSymlinkTable: SymlinkTable = + SymlinkTable("symlink_orc", + "SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'", orcFiles, schemaDDL, 1052) + private val parquetSymlinkTable: SymlinkTable = SymlinkTable("symlink_parquet" + , "SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'" + , parquetFiles, schemaDDL, 1900) + private val csvSymlinkTable: SymlinkTable = SymlinkTable("symlink_csv" + , "SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'" + , csvFiles, schemaDDL, 35) + + // query symlink table with csv format and check answer + test("[csv][symlink table][data]query check") { + createSymlinkTable(csvSymlinkTable)(checkTableAnswer) + } + + // analyze symlink table with csv format and check size in bytes + test("[csv][symlink table][size in bytes]analyze check") { + createSymlinkTable(csvSymlinkTable)(checkTableSize) + } + + // query symlink partition table with orc format and check answer + test("[csv][symlink table][partition][data]query check") { + createSymlinkTable(csvSymlinkTable, isPartition = true)(checkTableAnswer) + } + + // analyze symlink partition table with orc format and check size in bytes + test("[csv][symlink table][partition][size in bytes]analyze check") { + createSymlinkTable(csvSymlinkTable, isPartition = true)(checkTableSize) + } + + // query symlink table with orc format and check answer + test("[orc][symlink table][data]query check") { + createSymlinkTable(orcSymlinkTable)(checkTableAnswer) + } + + // analyze symlink table with orc format and check size in bytes + test("[orc][symlink table][size in bytes]analyze check") { + createSymlinkTable(orcSymlinkTable)(checkTableSize) + } + + + // query symlink partition table with orc format and check answer + test("[orc][symlink table][partition][data]query check") { + createSymlinkTable(orcSymlinkTable, isPartition = true)(checkTableAnswer) + } + + // analyze symlink partition table with orc format and check size in bytes + test("[orc][symlink table][partition][size in bytes]analyze check") { + createSymlinkTable(orcSymlinkTable, isPartition = true)(checkTableSize) + } + + + // query symlink table with parquet format and check answer + test("[parquet][symlink table][data]query check") { + createSymlinkTable(parquetSymlinkTable)(checkTableAnswer) + } + + // analyze symlink table with parquet format and check size in bytes + test("[parquet][symlink table][size in bytes]analyze check") { + createSymlinkTable(parquetSymlinkTable)(checkTableSize) + } + + + // query parquet partition table with orc format and check answer + test("[parquet][symlink partition table][data]query check") { + createSymlinkTable(parquetSymlinkTable, isPartition = true)(checkTableAnswer) + } + + // analyze parquet partition table with orc format and check size in bytes + test("[parquet][symlink partition table][size in bytes]analyze check") { + createSymlinkTable(parquetSymlinkTable, isPartition = true)(checkTableSize) + } + + def createSymlinkTable(symlinkTable: SymlinkTable, isPartition: Boolean = false) + (fuc: (SymlinkTable, Boolean) => Unit): Unit = { + val tableName = symlinkTable.tableName + val schemaDDL = symlinkTable.ddl + withTempDir { temp => + withTable(tableName) { + // generate manifest for symlink table + val partitionDDL = if (isPartition) { + val manifest1 = s"${temp.getCanonicalPath}/pt=2022-02-20/manifest" + writeContentToPath(transformFilePath(symlinkTable.files.headOption.toSeq) + , manifest1) + val manifest2 = s"${temp.getCanonicalPath}/pt=2022-02-19/manifest" + writeContentToPath(transformFilePath(symlinkTable.files.lastOption.toSeq) + , manifest2) + s"PARTITIONED BY ($partitionSchemaDDL)" + } else { + val manifest = s"${temp.getCanonicalPath}/manifest" + writeContentToPath(transformFilePath(symlinkTable.files), + manifest) + "" + } + + // drop table if exists + spark.sql(s"DROP TABLE IF EXISTS $tableName") + + // create external table + spark.sql( + s"""CREATE EXTERNAL TABLE $tableName ( $schemaDDL ) + |$partitionDDL + |ROW FORMAT ${symlinkTable.rowFormat} + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + |LOCATION 'file://${temp.getCanonicalPath}' + |""".stripMargin + ) + + // if this is partition, we should add partition for test + if (isPartition) { + AlterTableAddPartitionCommand(TableIdentifier(tableName), + Seq( + (Map("pt" -> "2022-02-20"), None), + (Map("pt" -> "2022-02-19"), None) + ), + ifNotExists = true).run(spark) + } + + fuc(symlinkTable, isPartition) + } + } + } + + def castDF(df: DataFrame): DataFrame = { + df.withColumn("version", 'version.cast(DataTypes.DoubleType)) + .withColumn("sort", 'sort.cast(DataTypes.IntegerType)) + } + + def checkTableAnswer(symlinkTable: SymlinkTable, isPartition: Boolean = false): Unit = { + // read table and compared result + val tableName = symlinkTable.tableName + var df = spark.sql(s"select * from $tableName") + df.printSchema() + df.show() + // spark will transform all columns to string when using OpenCSVSerde for row format + if (symlinkTable.rowFormat.contains("OpenCSVSerde")) { + df = castDF(df) + } + if (isPartition) { + checkAnswer(df, answerWithPartition) + } else { + checkAnswer(df, answerNoPartition) + } + } + + def checkTableSize(symlinkTable: SymlinkTable, isPartition: Boolean = false): Unit = { + val tableName = symlinkTable.tableName + analyzeTable(tableName) + // scalastyle:off println + val stats = getCatalogTable(tableName).stats + println(tableName, stats) + assert(stats.get.sizeInBytes == symlinkTable.size) + } + + + def analyzeTable(tbl: String, db: Option[String] = Option.empty, noscan: Boolean = true): Unit = { + AnalyzeTableCommand(TableIdentifier(tbl, db), noscan) + .run(spark) + } + + def getCatalogTable(tableName: String): CatalogTable = { + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + } +}