diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 91313f33a78e0..cc14958566d43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -72,12 +72,26 @@ class CatalogFileIndex( val startTime = System.nanoTime() val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) - val partitions = selectedPartitions.map { p => - val path = new Path(p.location) - val fs = path.getFileSystem(hadoopConf) - PartitionPath( - p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), - path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + val inputFormat = table.storage.inputFormat.getOrElse("") + + val partitions = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + selectedPartitions.flatMap { p => + val path = new Path(p.location) + val fs = path.getFileSystem(hadoopConf) + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path).map { targetPath => + PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + targetPath.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } + } + } else { + selectedPartitions.map { p => + val path = new Path(p.location) + val fs = path.getFileSystem(hadoopConf) + PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } } val partitionSpec = PartitionSpec(partitionSchema, partitions) val timeNs = System.nanoTime() - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 5341e22f5e670..e7aadecbd2a67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -92,7 +92,16 @@ abstract class PartitioningAwareFileIndex( // Directory does not exist, or has no children files Nil } - PartitionDirectory(values, files) + // Check leaf files since they might be symlink targets + if (files == Nil) { + val status: Seq[FileStatus] = leafFiles.get(path) match { + case Some(existingFile) if isNonEmptyFile(existingFile) => Seq(existingFile) + case _ => Nil + } + PartitionDirectory(values, status) + } else { + PartitionDirectory(values, files) + } } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala new file mode 100644 index 0000000000000..5185a500fc922 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.nio.charset.StandardCharsets.UTF_8 + +import scala.collection.JavaConverters._ + +import com.google.common.io.CharStreams +import org.apache.hadoop.fs.{FileSystem, Path} + +object SymlinkTextInputFormatUtil { + + def isSymlinkTextFormat(inputFormat: String): Boolean = { + inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") + } + + // Mostly copied from SymlinkTextInputFormat#getTargetPathsFromSymlinksDirs of Hive 3.1 + def getTargetPathsFromSymlink( + fileSystem: FileSystem, + symlinkDir: Path): Seq[Path] = { + + val symlinkIterator = fileSystem.listFiles(symlinkDir, true) + var targetPaths = Seq[Path]() + + while (symlinkIterator.hasNext) { + val fileStatus = symlinkIterator.next() + if (fileStatus.isFile) { + val reader = new BufferedReader( + new InputStreamReader(fileSystem.open(fileStatus.getPath), UTF_8)) + try { + val targets: Seq[Path] = CharStreams.readLines(reader).asScala. + map(symlinkStr => new Path(symlinkStr)) + targetPaths = targetPaths ++ targets + } finally { + reader.close + } + } + } + targetPaths + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index a89243c331c7b..f568a887c0c6b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -166,11 +166,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val tablePath = new Path(relation.tableMeta.location) val fileFormat = fileFormatClass.getConstructor().newInstance() + val inputFormat = relation.tableMeta.storage.inputFormat.getOrElse("") + val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + + val symlinkTargets = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath) + } else { + Nil + } val result = if (relation.isPartitioned) { val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { - Seq(tablePath) + if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + symlinkTargets + } else { + Seq(tablePath) + } } else { // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. @@ -183,6 +195,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (paths.isEmpty) { Seq(tablePath) + } else if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + paths.flatMap(path => SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path)) } else { paths } @@ -227,11 +241,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } else { - val rootPath = tablePath + val rootPaths = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + symlinkTargets + } else { + Seq(tablePath) + } + withTableCreationLock(tableIdentifier, { val cached = getCached( tableIdentifier, - Seq(rootPath), + rootPaths, metastoreSchema, fileFormatClass, None) @@ -241,7 +260,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log LogicalRelation( DataSource( sparkSession = sparkSession, - paths = rootPath.toString :: Nil, + paths = rootPaths.map(rootPath => rootPath.toString), userSpecifiedSchema = Option(updatedTable.dataSchema), bucketSpec = None, options = options, diff --git a/sql/hive/src/test/resources/data/files/sample1.csv b/sql/hive/src/test/resources/data/files/sample1.csv new file mode 100644 index 0000000000000..3fde4e20239e0 --- /dev/null +++ b/sql/hive/src/test/resources/data/files/sample1.csv @@ -0,0 +1 @@ +1,2,3 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/sample1.snappy.orc b/sql/hive/src/test/resources/data/files/sample1.snappy.orc new file mode 100644 index 0000000000000..31437dfbad470 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/sample1.snappy.orc differ diff --git a/sql/hive/src/test/resources/data/files/sample1.snappy.parquet b/sql/hive/src/test/resources/data/files/sample1.snappy.parquet new file mode 100644 index 0000000000000..5425c096c63d1 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/sample1.snappy.parquet differ diff --git a/sql/hive/src/test/resources/data/files/sample2.csv b/sql/hive/src/test/resources/data/files/sample2.csv new file mode 100644 index 0000000000000..43dc4b823c996 --- /dev/null +++ b/sql/hive/src/test/resources/data/files/sample2.csv @@ -0,0 +1,2 @@ +4,5,6 +7,8,9 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/sample2.snappy.orc b/sql/hive/src/test/resources/data/files/sample2.snappy.orc new file mode 100644 index 0000000000000..40ddc63517429 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/sample2.snappy.orc differ diff --git a/sql/hive/src/test/resources/data/files/sample2.snappy.parquet b/sql/hive/src/test/resources/data/files/sample2.snappy.parquet new file mode 100644 index 0000000000000..54e58ec0ba06e Binary files /dev/null and b/sql/hive/src/test/resources/data/files/sample2.snappy.parquet differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala new file mode 100644 index 0000000000000..6f8e03edb011e --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.{FileWriter, PrintWriter} + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf + +class SymlinkSuite extends QueryTest with TestHiveSingleton { + + test("symlink csv") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_csv" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_csv( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_csv.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.csv\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.csv\n" + val writer = new PrintWriter(new FileWriter(testSymlinkPath, false)) + writer.write(testData1) + writer.write(testData2) + writer.close() + + val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE symlink_csv" + spark.sql(load) + + val dml = "SELECT * FROM symlink_csv" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink csv partitioned") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_csv_partitioned" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_csv_partitioned( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_csv1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_csv2.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.csv\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.csv\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_csv_partitioned PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_csv_partitioned PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_csv_partitioned" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink orc") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_orc" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_orc( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_orc.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.orc\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.orc\n" + val writer = new PrintWriter(new FileWriter(testSymlinkPath, false)) + writer.write(testData1) + writer.write(testData2) + writer.close() + + val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE symlink_orc" + spark.sql(load) + + val dml = "SELECT * FROM symlink_orc" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink orc partitioned") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_orc_partitioned" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_orc_partitioned( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_orc1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_orc2.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.orc\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.orc\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_orc_partitioned PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_orc_partitioned PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_orc_partitioned" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink orc partitioned, lazy pruning disabled") { + withTempDir { temp => + spark.conf.set(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key, false) + + val drop = "DROP TABLE IF EXISTS symlink_orc_partitioned_nonlazy" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_orc_partitioned_nonlazy( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_orc1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_orc2.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.orc\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.orc\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_orc_partitioned_nonlazy PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_orc_partitioned_nonlazy PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_orc_partitioned_nonlazy" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + + spark.conf.set(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key, true) + } + } + + test("symlink parquet") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_parquet" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_parquet( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_parquet.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.parquet\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.parquet\n" + val writer = new PrintWriter(new FileWriter(testSymlinkPath, false)) + writer.write(testData1) + writer.write(testData2) + writer.close() + + val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE symlink_parquet" + spark.sql(load) + + val dml = "SELECT * FROM symlink_parquet" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink parquet partitioned") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_parquet_partitioned" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_parquet_partitioned( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val prefix = System.getProperty("user.dir") + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_parquet1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_parquet2.txt" + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.parquet\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.parquet\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_parquet_partitioned PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_parquet_partitioned PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_parquet_partitioned" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } +}