From c97f0031eb7c18d53ef6c302213e8766cb5d2e99 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Fri, 31 Jul 2020 12:25:07 +0900 Subject: [PATCH] [SPARK-32432] Added support for reading ORC/Parquet files with SymlinkTextInputFormat --- .../datasources/CatalogFileIndex.scala | 26 +- .../PartitioningAwareFileIndex.scala | 11 +- .../SymlinkTextInputFormatUtil.scala | 58 ++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 27 +- .../src/test/resources/data/files/sample1.csv | 1 + .../resources/data/files/sample1.snappy.orc | Bin 0 -> 337 bytes .../data/files/sample1.snappy.parquet | Bin 0 -> 883 bytes .../src/test/resources/data/files/sample2.csv | 2 + .../resources/data/files/sample2.snappy.orc | Bin 0 -> 348 bytes .../data/files/sample2.snappy.parquet | Bin 0 -> 907 bytes .../apache/spark/sql/hive/SymlinkSuite.scala | 322 ++++++++++++++++++ 11 files changed, 436 insertions(+), 11 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala create mode 100644 sql/hive/src/test/resources/data/files/sample1.csv create mode 100644 sql/hive/src/test/resources/data/files/sample1.snappy.orc create mode 100644 sql/hive/src/test/resources/data/files/sample1.snappy.parquet create mode 100644 sql/hive/src/test/resources/data/files/sample2.csv create mode 100644 sql/hive/src/test/resources/data/files/sample2.snappy.orc create mode 100644 sql/hive/src/test/resources/data/files/sample2.snappy.parquet create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/SymlinkSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 91313f33a78e0..cc14958566d43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -72,12 +72,26 @@ class CatalogFileIndex( val startTime = System.nanoTime() val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) - val partitions = selectedPartitions.map { p => - val path = new Path(p.location) - val fs = path.getFileSystem(hadoopConf) - PartitionPath( - p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), - path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + val inputFormat = table.storage.inputFormat.getOrElse("") + + val partitions = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + selectedPartitions.flatMap { p => + val path = new Path(p.location) + val fs = path.getFileSystem(hadoopConf) + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path).map { targetPath => + PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + targetPath.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } + } + } else { + selectedPartitions.map { p => + val path = new Path(p.location) + val fs = path.getFileSystem(hadoopConf) + PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } } val partitionSpec = PartitionSpec(partitionSchema, partitions) val timeNs = System.nanoTime() - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 5341e22f5e670..e7aadecbd2a67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -92,7 +92,16 @@ abstract class PartitioningAwareFileIndex( // Directory does not exist, or has no children files Nil } - PartitionDirectory(values, files) + // Check leaf files since they might be symlink targets + if (files == Nil) { + val status: Seq[FileStatus] = leafFiles.get(path) match { + case Some(existingFile) if isNonEmptyFile(existingFile) => Seq(existingFile) + case _ => Nil + } + PartitionDirectory(values, status) + } else { + PartitionDirectory(values, files) + } } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala new file mode 100644 index 0000000000000..5185a500fc922 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SymlinkTextInputFormatUtil.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.nio.charset.StandardCharsets.UTF_8 + +import scala.collection.JavaConverters._ + +import com.google.common.io.CharStreams +import org.apache.hadoop.fs.{FileSystem, Path} + +object SymlinkTextInputFormatUtil { + + def isSymlinkTextFormat(inputFormat: String): Boolean = { + inputFormat.equals("org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") + } + + // Mostly copied from SymlinkTextInputFormat#getTargetPathsFromSymlinksDirs of Hive 3.1 + def getTargetPathsFromSymlink( + fileSystem: FileSystem, + symlinkDir: Path): Seq[Path] = { + + val symlinkIterator = fileSystem.listFiles(symlinkDir, true) + var targetPaths = Seq[Path]() + + while (symlinkIterator.hasNext) { + val fileStatus = symlinkIterator.next() + if (fileStatus.isFile) { + val reader = new BufferedReader( + new InputStreamReader(fileSystem.open(fileStatus.getPath), UTF_8)) + try { + val targets: Seq[Path] = CharStreams.readLines(reader).asScala. + map(symlinkStr => new Path(symlinkStr)) + targetPaths = targetPaths ++ targets + } finally { + reader.close + } + } + } + targetPaths + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index a89243c331c7b..f568a887c0c6b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -166,11 +166,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions val tablePath = new Path(relation.tableMeta.location) val fileFormat = fileFormatClass.getConstructor().newInstance() + val inputFormat = relation.tableMeta.storage.inputFormat.getOrElse("") + val fs = tablePath.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + + val symlinkTargets = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, tablePath) + } else { + Nil + } val result = if (relation.isPartitioned) { val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { - Seq(tablePath) + if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + symlinkTargets + } else { + Seq(tablePath) + } } else { // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. @@ -183,6 +195,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (paths.isEmpty) { Seq(tablePath) + } else if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + paths.flatMap(path => SymlinkTextInputFormatUtil.getTargetPathsFromSymlink(fs, path)) } else { paths } @@ -227,11 +241,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } else { - val rootPath = tablePath + val rootPaths = if (SymlinkTextInputFormatUtil.isSymlinkTextFormat(inputFormat)) { + symlinkTargets + } else { + Seq(tablePath) + } + withTableCreationLock(tableIdentifier, { val cached = getCached( tableIdentifier, - Seq(rootPath), + rootPaths, metastoreSchema, fileFormatClass, None) @@ -241,7 +260,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log LogicalRelation( DataSource( sparkSession = sparkSession, - paths = rootPath.toString :: Nil, + paths = rootPaths.map(rootPath => rootPath.toString), userSpecifiedSchema = Option(updatedTable.dataSchema), bucketSpec = None, options = options, diff --git a/sql/hive/src/test/resources/data/files/sample1.csv b/sql/hive/src/test/resources/data/files/sample1.csv new file mode 100644 index 0000000000000..3fde4e20239e0 --- /dev/null +++ b/sql/hive/src/test/resources/data/files/sample1.csv @@ -0,0 +1 @@ +1,2,3 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/sample1.snappy.orc b/sql/hive/src/test/resources/data/files/sample1.snappy.orc new file mode 100644 index 0000000000000000000000000000000000000000..31437dfbad470726203013687c26e7bd880af078 GIT binary patch literal 337 zcmZvYO-chn5QSfL_f(2@B9%5^J1B%%4Y&y+A!L(aAk4~yxXB_&)U5|_Id}q>-oQI} zE)QTc{vepedsV!G$44#K^B&+iIH48J(|Y7BfRGCX7NaJbJ-JgC-JYD4Bd^c_uFxc3 zVAlHX38+s!3oGJ5CPFc00@D@POe#`}oUWuPscqz71WD#cx4?MoN4}->hkw~N+|~XU z&xsDc0XF;&Jidz6H!6G@H!%#R6_qM^uw-mOcLm)S^iUdcJ+~dwr0t&k>(;jWOfP?{ aXD`<^WS3skG+G+u)y0sXKdfD@=f|H)s2s3?&zncRm}HmDpXRSuK7h?Zb7iIIMDb(BtN^7N4Uk36Ed25|KR zX^`$twmW^GBHhsYb;ZC{y`$8CE{ZTxk@^nR4_H_Fg*AmbV3KZ#IRMUws;^*)$V$W& z6i7LDp$=G8My^o_>6egeB#?6YLLD%ygj7E(DgwHpH7Ci)qEjl|vNpz9=`6H_gR?o$ ztYq?HMLeSc&8YST7bJU~a?&GdmdyQ(w8(JG!yqNSy9J49l&7eO?yAB`JQBBf9)^^K zA}P9fi%1~pEtbpHPb2$pqyM+Y`z6q>cLSbAiTTWvbUcYo$8v1TF&k_?4g-^Rykax literal 0 HcmV?d00001 diff --git a/sql/hive/src/test/resources/data/files/sample2.csv b/sql/hive/src/test/resources/data/files/sample2.csv new file mode 100644 index 0000000000000..43dc4b823c996 --- /dev/null +++ b/sql/hive/src/test/resources/data/files/sample2.csv @@ -0,0 +1,2 @@ +4,5,6 +7,8,9 \ No newline at end of file diff --git a/sql/hive/src/test/resources/data/files/sample2.snappy.orc b/sql/hive/src/test/resources/data/files/sample2.snappy.orc new file mode 100644 index 0000000000000000000000000000000000000000..40ddc63517429781f1117e5eba3b1b74b3501d86 GIT binary patch literal 348 zcmaKmu}T9$7=-8l_ipcZy@Wrj=(31}s{%Gxs7R`WfLxV?*j$lDP3a?81Rp>weFxvd zXYv7DjwlwvVwhnDhHtT&4FG3-CcMN&ibu`>@TIV~BH9lpdwQoD;-0=#AI_I3fLnS! z0OsVhv(i5U&6%@e72z-!p%8Kb_XH-Fnv{Y~D=CK}m&Sy`vml9ov;l6`?%H*vuK$;> zwLKl~nA#%c9YHb{CF^(-BuHr8mST(tV_dNRJguHptdBkGuQvzxS8F b_mj7I1KA|#?N%#BIl8Lt%a<{W)$H^e7TFS6on^gtP$K!NFa+cU{Q!6u{3Q;H||7S6mcgaGs#qo>5utQq!fG_7cTn} zzC@qGC-By0WFUgnA!H`!p8LT)Nd~8vK1SHVo`W?w)Uj@x`YzfmQ`gFqfewFBp=eyG zaIe*bQM6W}dWkai9SK@hC8#h8Qx&@R1FcKYswzQ+QJAo5U7YP_Gnzi!X3;$#Q(Q+_ zJ5Wu9mOCr=2pO7XTm7bPW6f&ob!34FtA=L0DaO0f)cae6XW1|h!t4B+OGP3wiMUh% zDdzt(IvKfAC1fNaS1KUI^d=0kS?gC^Rgr`=K5K>sf_XGe?_4^gke2%$oSQu+E+qI^ z@Z3$O&+eG#G^RN&~6wH tyTdSSov?rpgP_%ly}%1X-u9x9b;6FvIE(!wt8XK;xzH + val drop = "DROP TABLE IF EXISTS symlink_csv" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_csv( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_csv.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.csv\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.csv\n" + val writer = new PrintWriter(new FileWriter(testSymlinkPath, false)) + writer.write(testData1) + writer.write(testData2) + writer.close() + + val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE symlink_csv" + spark.sql(load) + + val dml = "SELECT * FROM symlink_csv" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink csv partitioned") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_csv_partitioned" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_csv_partitioned( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_csv1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_csv2.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.csv\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.csv\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_csv_partitioned PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_csv_partitioned PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_csv_partitioned" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink orc") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_orc" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_orc( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_orc.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.orc\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.orc\n" + val writer = new PrintWriter(new FileWriter(testSymlinkPath, false)) + writer.write(testData1) + writer.write(testData2) + writer.close() + + val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE symlink_orc" + spark.sql(load) + + val dml = "SELECT * FROM symlink_orc" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink orc partitioned") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_orc_partitioned" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_orc_partitioned( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_orc1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_orc2.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.orc\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.orc\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_orc_partitioned PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_orc_partitioned PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_orc_partitioned" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink orc partitioned, lazy pruning disabled") { + withTempDir { temp => + spark.conf.set(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key, false) + + val drop = "DROP TABLE IF EXISTS symlink_orc_partitioned_nonlazy" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_orc_partitioned_nonlazy( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_orc1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_orc2.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.orc\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.orc\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_orc_partitioned_nonlazy PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_orc_partitioned_nonlazy PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_orc_partitioned_nonlazy" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + + spark.conf.set(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key, true) + } + } + + test("symlink parquet") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_parquet" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_parquet( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val testSymlinkPath = s"${temp.getCanonicalPath}/symlink_parquet.txt" + val prefix = System.getProperty("user.dir") + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.parquet\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.parquet\n" + val writer = new PrintWriter(new FileWriter(testSymlinkPath, false)) + writer.write(testData1) + writer.write(testData2) + writer.close() + + val load = s"LOAD DATA LOCAL INPATH '${testSymlinkPath}' INTO TABLE symlink_parquet" + spark.sql(load) + + val dml = "SELECT * FROM symlink_parquet" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } + + test("symlink parquet partitioned") { + withTempDir { temp => + val drop = "DROP TABLE IF EXISTS symlink_parquet_partitioned" + spark.sql(drop) + + val ddl = + s"""CREATE TABLE symlink_parquet_partitioned( + |a BIGINT, + |b BIGINT, + |c BIGINT + |) + |PARTITIONED BY (dt STRING) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' + """.stripMargin + spark.sql(ddl) + + val prefix = System.getProperty("user.dir") + val testSymlinkPath1 = s"${temp.getCanonicalPath}/symlink_parquet1.txt" + val testSymlinkPath2 = s"${temp.getCanonicalPath}/symlink_parquet2.txt" + val testData1 = s"file://$prefix/src/test/resources/data/files/sample1.snappy.parquet\n" + val testData2 = s"file://$prefix/src/test/resources/data/files/sample2.snappy.parquet\n" + val writer1 = new PrintWriter(new FileWriter(testSymlinkPath1, false)) + val writer2 = new PrintWriter(new FileWriter(testSymlinkPath2, false)) + writer1.write(testData1) + writer2.write(testData2) + writer1.close() + writer2.close() + + val load1 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath1}' " + + "INTO TABLE symlink_parquet_partitioned PARTITION (dt=20200726)" + val load2 = s"LOAD DATA LOCAL INPATH '${testSymlinkPath2}' " + + "INTO TABLE symlink_parquet_partitioned PARTITION (dt=20200727)" + spark.sql(load1) + spark.sql(load2) + + val dml = "SELECT * FROM symlink_parquet_partitioned" + val df = spark.sql(dml) + + df.show() + assert(df.count() == 3) + } + } +}