Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.sql.execution.command

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.SymlinkTextInputFormatUtil
import org.apache.spark.sql.util.PartitioningUtils

/**
Expand Down Expand Up @@ -103,11 +106,14 @@ case class AnalyzePartitionCommand(
calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
}

// get real symlink table size
val isSymlinkTable = SymlinkTextInputFormatUtil.isSymlinkTextFormat(tableMeta)
lazy val fs = new Path(tableMeta.location)
.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
// Update the metastore if newly computed statistics are different from those
// recorded in the metastore.

val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier,
partitions.map(_.storage.locationUri))
val sizes = CommandUtils.calculateMultipleLocationSizes(sparkSession, tableMeta.identifier
, CommandUtils.getPartitionPaths(partitions, isSymlinkTable, fs))
val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
val newRowCount = rowCounts.get(p.spec)
val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), newRowCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex, SymlinkTextInputFormatUtil}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -74,21 +74,46 @@ object CommandUtils extends Logging {
def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = {
val sessionState = spark.sessionState
val startTime = System.nanoTime()

val isSymlinkTable = SymlinkTextInputFormatUtil.isSymlinkTextFormat(catalogTable)
lazy val fs = new Path(catalogTable.location)
.getFileSystem(spark.sessionState.newHadoopConf())
val totalSize = if (catalogTable.partitionColumnNames.isEmpty) {
calculateSingleLocationSize(sessionState, catalogTable.identifier,
catalogTable.storage.locationUri)
if (isSymlinkTable) {
calculateMultipleLocationSizes(spark, catalogTable.identifier
, SymlinkTextInputFormatUtil.getSymlinkTableLocationPaths(fs, catalogTable.location)
).sum
} else {
calculateSingleLocationSize(sessionState, catalogTable.identifier,
catalogTable.storage.locationUri)
}
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.")
val paths = partitions.map(_.storage.locationUri)
calculateMultipleLocationSizes(spark, catalogTable.identifier, paths).sum
calculateMultipleLocationSizes(spark, catalogTable.identifier,
getPartitionPaths(partitions, isSymlinkTable, fs)).sum
}
logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" +
s" the total size for table ${catalogTable.identifier}.")
totalSize
}

def getPartitionPaths(
partitions: Seq[CatalogTablePartition],
isSymlinkTable: Boolean,
fs: => FileSystem): Seq[Option[URI]] = {
partitions.flatMap { catalogTablePartition =>
if (isSymlinkTable) {
Comment thread
CHENXCHEN marked this conversation as resolved.
catalogTablePartition.storage.locationUri
.map(SymlinkTextInputFormatUtil.getSymlinkTableLocationPaths(fs, _))
.getOrElse(Seq.empty)
} else {
Seq(catalogTablePartition.storage.locationUri)
}
}
}

def calculateSingleLocationSize(
sessionState: SessionState,
identifier: TableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,21 @@ class CatalogFileIndex(
val startTime = System.nanoTime()
val selectedPartitions = ExternalCatalogUtils.listPartitionsByFilter(
sparkSession.sessionState.conf, sparkSession.sessionState.catalog, table, filters)
val partitions = selectedPartitions.map { p =>
val isSymlinkTextFormat = SymlinkTextInputFormatUtil.isSymlinkTextFormat(table)
val partitions = selectedPartitions.flatMap { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
if (isSymlinkTextFormat) {
SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path).map { targetPath =>
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
targetPath.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
} else {
Some(PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
}
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,16 @@ abstract class PartitioningAwareFileIndex(
// Directory does not exist, or has no children files
Nil
}
PartitionDirectory(values, files)
// Check leaf files since they might be symlink targets
if (files.isEmpty) {
val status: Seq[FileStatus] = leafFiles.get(path) match {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are Symlink targets in leaf files? I think leaf files are listed from table root?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SymlinkTextInputFormat table specify the list of files for a table/partition based on the content of a text file.
As the example in the PR description shows, the real data file is not in the table root

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the real data file is not in the table root

Yea, but leafFiles are listed from table root, how symlink targets might be in leaf files?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the partition table, we have the same behavior, there is also symbolic in the root directory of the partition

case Some(existingFile) if isNonEmptyFile(existingFile) => Seq(existingFile)
case _ => Nil
}
PartitionDirectory(values, status)
} else {
PartitionDirectory(values, files)
}
}
}
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.io.{BufferedReader, InputStreamReader}
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import com.google.common.io.CharStreams
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.catalog.CatalogTable

object SymlinkTextInputFormatUtil {

/**
* Determine if InputFormat is SymlinkTable
*
* @param inputFormat Table InputFormat
* @return
*/
def isSymlinkTextFormat(inputFormat: String): Boolean = {
inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat")
}

/**
* Determine CatalogTable is SymlinkTable
*
* @param catalogTable CatalogTable
* @return
*/
def isSymlinkTextFormat(catalogTable: CatalogTable): Boolean = {
catalogTable.storage.inputFormat.exists(isSymlinkTextFormat)
}

/**
* Get symlink files from target path
* Mostly copied from BackgroundHiveSplitLoader#getTargetPathsFromSymlink of trino(prestosql)
* compatible with hive SymlinkTextInputFormat#getTargetPathsFromSymlinksDirs
*
* @param fileSystem filesystem
* @param symlinkDir symlink table location
* @return
*/
def getTargetPathsFromSymlink(
fileSystem: FileSystem,
symlinkDir: Path): Seq[Path] = {

val symlinks = fileSystem.listStatus(symlinkDir, new PathFilter() {
override def accept(p: Path): Boolean = DataSourceUtils.isDataPath(p)
})

symlinks.flatMap {
case fileStatus if fileStatus.isFile =>
val reader = new BufferedReader(
new InputStreamReader(fileSystem.open(fileStatus.getPath), UTF_8))
try {
CharStreams.readLines(reader).asScala
.map(symlinkStr => new Path(symlinkStr))
} finally {
reader.close()
Seq.empty
}
case _ =>
Seq.empty
}
}

/**
* Get symlink uris from target path
*
* @param fileSystem filesystem
* @param location symlink table location
* @return
*/
def getSymlinkTableLocationPaths(fileSystem: FileSystem, location: URI): Seq[Option[URI]] = {
SymlinkTextInputFormatUtil
.getTargetPathsFromSymlink(fileSystem, new Path(location))
.map(path => Option(path.toUri))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
(options, None)
}

lazy val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val isSymlinkTextFormat = SymlinkTextInputFormatUtil.isSymlinkTextFormat(relation.tableMeta)

val symlinkTargets = if (isSymlinkTextFormat) {
SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When table is partitioned and lazyPruningEnabled = false, this value is not used, I think we don't need to get this value earliear

} else {
Nil
}

val result = if (relation.isPartitioned) {
val partitionSchema = relation.tableMeta.partitionSchema
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
Seq(tablePath)
if (isSymlinkTextFormat) {
symlinkTargets
} else {
Seq(tablePath)
}
} else {
// By convention (for example, see CatalogFileIndex), the definition of a
// partitioned table's paths depends on whether that table has any actual partitions.
Expand All @@ -220,6 +233,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

if (paths.isEmpty) {
Seq(tablePath)
} else if (isSymlinkTextFormat) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu also used in here when lazyPruningEnabled = false

paths.flatMap(path => SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path))
} else {
paths
}
Expand Down Expand Up @@ -264,11 +279,15 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
} else {
val rootPath = tablePath
val rootPaths = if (isSymlinkTextFormat) {
symlinkTargets
} else {
Seq(tablePath)
}
withTableCreationLock(tableIdentifier, {
val cached = getCached(
tableIdentifier,
Seq(rootPath),
rootPaths,
metastoreSchema,
fileFormatClass,
None)
Expand All @@ -278,7 +297,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
LogicalRelation(
DataSource(
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
paths = rootPaths.map(_.toString),
userSpecifiedSchema = Option(updatedTable.dataSchema),
bucketSpec = hiveBucketSpec,
// Do not interpret the 'path' option at all when tables are read using the Hive
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Spark,3.2,1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Trino,371.0,2
Hive,2.3,3
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading