Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,26 @@ class CatalogFileIndex(
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
val inputFormat = table.storage.inputFormat.getOrElse("")

val partitions = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) {
selectedPartitions.flatMap { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path).map { targetPath =>
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
targetPath.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
}
} else {
selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,16 @@ abstract class PartitioningAwareFileIndex(
// Directory does not exist, or has no children files
Nil
}
PartitionDirectory(values, files)
// Check leaf files since they might be symlink targets
if (files == Nil) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isEmpty?

val status: Seq[FileStatus] = leafFiles.get(path) match {
case Some(existingFile) if isNonEmptyFile(existingFile) => Seq(existingFile)
case _ => Nil
}
PartitionDirectory(values, status)
} else {
PartitionDirectory(values, files)
}
}
}
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.io.{BufferedReader, InputStreamReader, IOException}
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._

import com.google.common.io.CharStreams
import org.apache.hadoop.fs.{FileSystem, Path}

object SymlinkTextInputFormatUtil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Javadoc to the class and the methods.


def isSymlinkTextFormat(inputFormat: String): Boolean = {
inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat")
}

// Mostly copied from SymlinkTextInputFormat#getTargetPathsFromSymlinksDirs of Hive 3.1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it compatible with other versions of Hive?

def getTargetPathsFromSymlink(
fileSystem: FileSystem,
symlinkDir: Path): Seq[Path] = {

val symlinkIterator = fileSystem.listFiles(symlinkDir, true)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, you should ignore the name start with "_" or ".", just like this hive implement, cause hadoop will generate temp file start with "_" or ".", we should ignore these files.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments, I will reflect your comments on this.

var targetPaths = Seq[Path]()

while (symlinkIterator.hasNext) {
val fileStatus = symlinkIterator.next()
if (fileStatus.isFile) {
val reader = new BufferedReader(
new InputStreamReader(fileSystem.open(fileStatus.getPath), UTF_8))
try {
val targets: Seq[Path] = CharStreams.readLines(reader).asScala.
map(symlinkStr => new Path(symlinkStr))
targetPaths = targetPaths ++ targets
} finally {
reader.close
}
}
}
targetPaths
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val tablePath = new Path(relation.tableMeta.location)
val fileFormat = fileFormatClass.getConstructor().newInstance()
val inputFormat = relation.tableMeta.storage.inputFormat.getOrElse("")
val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)

val symlinkTargets = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) {
SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath)
} else {
Nil
}

val result = if (relation.isPartitioned) {
val partitionSchema = relation.tableMeta.partitionSchema
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
Seq(tablePath)
if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) {
symlinkTargets
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be replaced with SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath). I don't think we need the if statement above.

} else {
Seq(tablePath)
}
} else {
// By convention (for example, see CatalogFileIndex), the definition of a
// partitioned table's paths depends on whether that table has any actual partitions.
Expand All @@ -183,6 +195,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

if (paths.isEmpty) {
Seq(tablePath)
} else if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) {
paths.flatMap(path => SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path))
} else {
paths
}
Expand Down Expand Up @@ -227,11 +241,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
logicalRelation
})
} else {
val rootPath = tablePath
val rootPaths = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) {
symlinkTargets
} else {
Seq(tablePath)
}

withTableCreationLock(tableIdentifier, {
val cached = getCached(
tableIdentifier,
Seq(rootPath),
rootPaths,
metastoreSchema,
fileFormatClass,
None)
Expand All @@ -241,7 +260,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
LogicalRelation(
DataSource(
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
paths = rootPaths.map(rootPath => rootPath.toString),
userSpecifiedSchema = Option(updatedTable.dataSchema),
bucketSpec = None,
options = options,
Expand Down
1 change: 1 addition & 0 deletions sql/hive/src/test/resources/data/files/sample1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,2,3
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be other examples of a CSV file in the resources, can you use those ones? The same applies to Parquet and ORC.

Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions sql/hive/src/test/resources/data/files/sample2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
4,5,6
7,8,9
Binary file not shown.
Binary file not shown.
Loading