From 53e9b6ba7e2bf9d5552013280d620ab908e37d87 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 3 May 2019 22:18:07 -0700 Subject: [PATCH 1/8] globFilter --- docs/sql-data-sources-binaryFile.md | 24 ++----- .../apache/spark/sql/DataFrameReader.scala | 9 +++ .../execution/datasources/DataSource.scala | 3 +- .../PartitioningAwareFileIndex.scala | 11 ++- .../binaryfile/BinaryFileFormat.scala | 70 +++++++------------ .../streaming/FileStreamSource.scala | 4 +- .../streaming/MetadataLogFileIndex.scala | 3 +- .../sql/streaming/DataStreamReader.scala | 9 +++ .../spark/sql/FileBasedDataSourceSuite.scala | 32 +++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 19 +++++ 10 files changed, 115 insertions(+), 69 deletions(-) diff --git a/docs/sql-data-sources-binaryFile.md b/docs/sql-data-sources-binaryFile.md index d861a24219be0..65b398208929c 100644 --- a/docs/sql-data-sources-binaryFile.md +++ b/docs/sql-data-sources-binaryFile.md @@ -28,28 +28,14 @@ It produces a DataFrame with the following columns and possibly partition column * `length`: LongType * `content`: BinaryType -It supports the following read option: - - - - - - - -
Property NameDefaultMeaning
pathGlobFilternone (accepts all) - An optional glob pattern to only include files with paths matching the pattern. - The syntax follows org.apache.hadoop.fs.GlobFilter. - It does not change the behavior of partition discovery. -
- To read whole binary files, you need to specify the data source `format` as `binaryFile`. -For example, the following code reads all PNG files from the input directory: +For example, the following code reads all the files from the input directory:
{% highlight scala %} -spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data") +spark.read.format("binaryFile").load("/path/to/data") {% endhighlight %}
@@ -57,21 +43,21 @@ spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to
{% highlight java %} -spark.read().format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data"); +spark.read().format("binaryFile").load("/path/to/data"); {% endhighlight %}
{% highlight python %} -spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data") +spark.read.format("binaryFile").load("/path/to/data") {% endhighlight %}
{% highlight r %} -read.df("/path/to/data", source = "binaryFile", pathGlobFilter = "*.png") +read.df("/path/to/data", source = "binaryFile") {% endhighlight %}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 8460c7902e7d3..dfc6d8ce96a6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -98,6 +98,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * * @since 1.4.0 @@ -135,6 +138,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * * @since 1.4.0 @@ -151,6 +157,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * * @since 1.4.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 52f30acd4822d..ef430f408b549 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -348,7 +348,8 @@ case class DataSource( sparkSession.sessionState.newHadoopConf(), sparkSession.sessionState.conf) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema) + val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, + caseInsensitiveOptions, userSpecifiedSchema) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 29b304a1e4879..3c932555179fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -56,6 +56,12 @@ abstract class PartitioningAwareFileIndex( protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] + protected lazy val pathGlobFilter = parameters.get("pathGlobFilter").map(new GlobFilter(_)) + + protected def matchGlobPattern(file: FileStatus): Boolean = { + pathGlobFilter.forall(_.accept(file.getPath)) + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { @@ -69,7 +75,7 @@ abstract class PartitioningAwareFileIndex( val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filter(isNonEmptyFile) + existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f)) case None => // Directory does not exist, or has no children files @@ -89,7 +95,7 @@ abstract class PartitioningAwareFileIndex( override def sizeInBytes: Long = allFiles().map(_.getLen).sum def allFiles(): Seq[FileStatus] = { - if (partitionSpec().partitionColumns.isEmpty) { + val files = if (partitionSpec().partitionColumns.isEmpty) { // For each of the root input paths, get the list of files inside them rootPaths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles). @@ -118,6 +124,7 @@ abstract class PartitioningAwareFileIndex( } else { leafFiles.values.toSeq } + files.filter(matchGlobPattern) } protected def inferPartitioning(): PartitionSpec = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index 2637784554933..cdc7cd53c8b64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -47,12 +47,10 @@ import org.apache.spark.util.SerializableConfiguration * {{{ * // Scala * val df = spark.read.format("binaryFile") - * .option("pathGlobFilter", "*.png") * .load("/path/to/fileDir") * * // Java * Dataset df = spark.read().format("binaryFile") - * .option("pathGlobFilter", "*.png") * .load("/path/to/fileDir"); * }}} */ @@ -98,44 +96,37 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val binaryFileSourceOptions = new BinaryFileSourceOptions(options) - val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter val filterFuncs = filters.map(filter => createFilterFunction(filter)) val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) file: PartitionedFile => { val path = new Path(file.filePath) - // TODO: Improve performance here: each file will recompile the glob pattern here. - if (pathGlobPattern.forall(new GlobFilter(_).accept(path))) { - val fs = path.getFileSystem(broadcastedHadoopConf.value.value) - val status = fs.getFileStatus(path) - if (filterFuncs.forall(_.apply(status))) { - val writer = new UnsafeRowWriter(requiredSchema.length) - writer.resetRowWriter() - requiredSchema.fieldNames.zipWithIndex.foreach { - case (PATH, i) => writer.write(i, UTF8String.fromString(status.getPath.toString)) - case (LENGTH, i) => writer.write(i, status.getLen) - case (MODIFICATION_TIME, i) => - writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime)) - case (CONTENT, i) => - if (status.getLen > maxLength) { - throw new SparkException( - s"The length of ${status.getPath} is ${status.getLen}, " + - s"which exceeds the max length allowed: ${maxLength}.") - } - val stream = fs.open(status.getPath) - try { - writer.write(i, ByteStreams.toByteArray(stream)) - } finally { - Closeables.close(stream, true) - } - case (other, _) => - throw new RuntimeException(s"Unsupported field name: ${other}") - } - Iterator.single(writer.getRow) - } else { - Iterator.empty + val fs = path.getFileSystem(broadcastedHadoopConf.value.value) + val status = fs.getFileStatus(path) + if (filterFuncs.forall(_.apply(status))) { + val writer = new UnsafeRowWriter(requiredSchema.length) + writer.resetRowWriter() + requiredSchema.fieldNames.zipWithIndex.foreach { + case (PATH, i) => writer.write(i, UTF8String.fromString(status.getPath.toString)) + case (LENGTH, i) => writer.write(i, status.getLen) + case (MODIFICATION_TIME, i) => + writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime)) + case (CONTENT, i) => + if (status.getLen > maxLength) { + throw new SparkException( + s"The length of ${status.getPath} is ${status.getLen}, " + + s"which exceeds the max length allowed: ${maxLength}.") + } + val stream = fs.open(status.getPath) + try { + writer.write(i, ByteStreams.toByteArray(stream)) + } finally { + Closeables.close(stream, true) + } + case (other, _) => + throw new RuntimeException(s"Unsupported field name: ${other}") } + Iterator.single(writer.getRow) } else { Iterator.empty } @@ -204,14 +195,3 @@ object BinaryFileFormat { } } -class BinaryFileSourceOptions( - @transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { - - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) - - /** - * An optional glob pattern to only include files with paths matching the pattern. - * The syntax follows [[org.apache.hadoop.fs.GlobFilter]]. - */ - val pathGlobFilter: Option[String] = parameters.get("pathGlobFilter") -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index cef814b5b6d24..67e26dc1a2dbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.types.StructType @@ -195,7 +196,8 @@ class FileStreamSource( private def allFilesUsingMetadataLogFileIndex() = { // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a // non-glob path - new MetadataLogFileIndex(sparkSession, qualifiedBasePath, None).allFiles() + new MetadataLogFileIndex(sparkSession, qualifiedBasePath, + CaseInsensitiveMap(options), None).allFiles() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index 80eed7b277216..6eaccfb6d9347 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.types.StructType class MetadataLogFileIndex( sparkSession: SparkSession, path: Path, + parameters: Map[String, String], userSpecifiedSchema: Option[StructType]) - extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) { + extends PartitioningAwareFileIndex(sparkSession, parameters, userSpecifiedSchema) { private val metadataDirectory = { val metadataDir = new Path(path, FileStreamSink.metadataDir) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 01083a994e8a1..bb536b6fee47c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -83,6 +83,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
    *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone * to be used to parse timestamps in the JSON/CSV datasources or partition values.
  • + *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching + * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + * It does not change the behavior of partition discovery.
  • *
* * @since 2.0.0 @@ -120,6 +123,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
    *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone * to be used to parse timestamps in the JSON/CSV data sources or partition values.
  • + *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching + * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + * It does not change the behavior of partition discovery.
  • *
* * @since 2.0.0 @@ -136,6 +142,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
    *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone * to be used to parse timestamps in the JSON/CSV data sources or partition values.
  • + *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching + * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + * It does not change the behavior of partition discovery.
  • *
* * @since 2.0.0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c6fdf41ca7d93..3ab5bf26157a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -539,6 +539,38 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + test("Option pathGlobFilter: filter files correctly") { + withTempPath { path => + val dataDir = path.getCanonicalPath + Seq("foo").toDS().write.text(dataDir) + Seq("bar").toDS().write.mode("append").orc(dataDir) + val df = spark.read.option("pathGlobFilter", "*.txt").text(dataDir) + checkAnswer(df, Row("foo")) + + // Both glob pattern in option and path should be effective to filter files. + val df2 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*.orc") + checkAnswer(df2, Seq.empty) + + val df3 = spark.read.option("pathGlobFilter", "*.txt").text(dataDir + "/*xt") + checkAnswer(df3, Row("foo")) + } + } + + test("Option pathGlobFilter: simple extension filtering should contains partition info") { + withTempPath { path => + val input = Seq(("foo", 1), ("oof", 2)).toDF("a", "b") + input.write.partitionBy("b").text(path.getCanonicalPath) + Seq("bar").toDS().write.mode("append").orc(path.getCanonicalPath + "/b=1") + + // If we use glob pattern in the path, the partition column won't be shown in the result. + val df = spark.read.text(path.getCanonicalPath + "/*/*.txt") + checkAnswer(df, input.select("a")) + + val df2 = spark.read.option("pathGlobFilter", "*.txt").text(path.getCanonicalPath) + checkAnswer(df2, input) + } + } + test("Return correct results when data columns overlap with partition columns") { Seq("parquet", "orc", "json").foreach { format => withTempPath { path => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 4b0bab173ae91..2b8d77386925f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -483,6 +483,25 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("Option pathGlobFilter") { + val testTableName = "FileStreamSourceTest" + withTable(testTableName) { + withTempPath { output => + Seq("foo").toDS().write.text(output.getCanonicalPath) + Seq("bar").toDS().write.mode("append").orc(output.getCanonicalPath) + val df = spark.readStream.option("pathGlobFilter", "*.txt") + .format("text").load(output.getCanonicalPath) + val query = df.writeStream.format("memory").queryName(testTableName).start() + try { + query.processAllAvailable() + checkDatasetUnorderly(spark.table(testTableName).as[String], "foo") + } finally { + query.stop() + } + } + } + } + test("read from textfile") { withTempDirs { case (src, tmp) => val textStream = spark.readStream.textFile(src.getCanonicalPath) From 5aac13c360a5a28fb1f85ac5fb72eefe8bdfc1c6 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 6 May 2019 13:41:35 -0700 Subject: [PATCH 2/8] update python comments --- python/pyspark/sql/readwriter.py | 12 ++++++++++++ python/pyspark/sql/streaming.py | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1353a0f6e9107..8ca610094f3ff 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -120,6 +120,9 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. """ self._jreader = self._jreader.option(key, to_str(value)) return self @@ -132,6 +135,9 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. """ for k in options: self._jreader = self._jreader.option(k, to_str(options[k])) @@ -624,6 +630,9 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. """ self._jwrite = self._jwrite.option(key, to_str(value)) return self @@ -636,6 +645,9 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. """ for k in options: self._jwrite = self._jwrite.option(k, to_str(options[k])) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index d15779bc4725a..476ed003b9db4 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -341,6 +341,9 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. .. note:: Evolving. @@ -357,6 +360,9 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. .. note:: Evolving. @@ -769,6 +775,9 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. .. note:: Evolving. """ @@ -783,6 +792,9 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. + * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching + the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. + It does not change the behavior of partition discovery. .. note:: Evolving. """ From a308f3fe889f4789da947a6ed69c6cebd4f87d29 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 6 May 2019 15:54:47 -0700 Subject: [PATCH 3/8] update sql-data-sources-load-save-functions.md and example code --- docs/sql-data-sources-load-save-functions.md | 22 ++++++++++++++++++ .../sql/JavaSQLDataSourceExample.java | 5 ++++ examples/src/main/python/sql/datasource.py | 5 ++++ examples/src/main/r/RSparkSQLExample.R | 4 ++++ .../do_not_read_this.txt | 1 + .../users.orc | Bin 0 -> 448 bytes .../favorite_color=red/users.orc | Bin 0 -> 402 bytes .../examples/sql/SQLDataSourceExample.scala | 5 ++++ 8 files changed, 42 insertions(+) create mode 100644 examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt create mode 100644 examples/src/main/resources/partitioned_users.orc/favorite_color=__HIVE_DEFAULT_PARTITION__/users.orc create mode 100644 examples/src/main/resources/partitioned_users.orc/favorite_color=red/users.orc diff --git a/docs/sql-data-sources-load-save-functions.md b/docs/sql-data-sources-load-save-functions.md index a7efb9347ac64..5021a9990ead7 100644 --- a/docs/sql-data-sources-load-save-functions.md +++ b/docs/sql-data-sources-load-save-functions.md @@ -102,6 +102,28 @@ To load a CSV file you can use:
+To load files with paths matching a given glob pattern recursively while keeping the behavior of partition discovery, +you can use: + +
+
+{% include_example load_with_path_glob_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +
+ +
+{% include_example load_with_path_glob_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +
+ +
+{% include_example load_with_path_glob_filter python/sql/datasource.py %} +
+ +
+{% include_example load_with_path_glob_filter r/RSparkSQLExample.R %} + +
+
+ The extra options are also used during write operation. For example, you can control bloom filters and dictionary encodings for ORC data sources. The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`. diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index cbe9dfdaa907b..b2ce0bc08642a 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -123,6 +123,11 @@ private static void runBasicDataSourceExample(SparkSession spark) { .option("header", "true") .load("examples/src/main/resources/people.csv"); // $example off:manual_load_options_csv$ + // $example on:load_with_path_glob_filter$ + Dataset partitionedUsersDF = spark.read().format("orc") + .option("pathGlobFilter", "*.orc") + .load("examples/src/main/resources/partitioned_users.orc"); + // $example off:load_with_path_glob_filter$ // $example on:manual_save_options_orc$ usersDF.write().format("orc") .option("orc.bloom.filter.columns", "favorite_color") diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index 04660724b308d..0d78097ea975e 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -57,6 +57,11 @@ def basic_datasource_example(spark): format="csv", sep=":", inferSchema="true", header="true") # $example off:manual_load_options_csv$ + # $example on:load_with_path_glob_filter$ + df = spark.read.load("examples/src/main/resources/partitioned_users.orc", + format="orc", pathGlobFilter="*.orc") + # $example off:load_with_path_glob_filter$ + # $example on:manual_save_options_orc$ df = spark.read.orc("examples/src/main/resources/users.orc") (df.write.format("orc") diff --git a/examples/src/main/r/RSparkSQLExample.R b/examples/src/main/r/RSparkSQLExample.R index 196a110f351ce..fa083d5542fae 100644 --- a/examples/src/main/r/RSparkSQLExample.R +++ b/examples/src/main/r/RSparkSQLExample.R @@ -118,6 +118,10 @@ df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferS namesAndAges <- select(df, "name", "age") # $example off:manual_load_options_csv$ +# $example on:load_with_path_glob_filter$ +df <- read.df("examples/src/main/resources/partitioned_users.orc", "orc", pathGlobFilter = "*.orc") +# $example off:load_with_path_glob_filter$ + # $example on:manual_save_options_orc$ df <- read.df("examples/src/main/resources/users.orc", "orc") write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name") diff --git a/examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt b/examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt new file mode 100644 index 0000000000000..9c19f2a0449eb --- /dev/null +++ b/examples/src/main/resources/partitioned_users.orc/do_not_read_this.txt @@ -0,0 +1 @@ +do not read this diff --git a/examples/src/main/resources/partitioned_users.orc/favorite_color=__HIVE_DEFAULT_PARTITION__/users.orc b/examples/src/main/resources/partitioned_users.orc/favorite_color=__HIVE_DEFAULT_PARTITION__/users.orc new file mode 100644 index 0000000000000000000000000000000000000000..890395a9281abb71a8444a3c9041155fae6c0f9f GIT binary patch literal 448 zcmZ9I&q~8U5XNVBo9(zw?Ydr;SjFZbMIi*dg&ryutq3;s6fbI&s73pSL<`=EPtdpU zMZ_oa0on0VPFPM^G~@eRf!WgU9H=ldim;DS}SrJF{zz-(+ldGL?0J-_Gzeh^9ZY$ja_ PcC+R4_ix5}{f_k!}tAl=uBlmXZBh|u*|g)<(VO4sHM8gz=Q*dq|0Nzq8h zaaAOV-=FcZP}g^L`!CzXuYyvTD7|GlK_F?~v7@;p&h{?)7#xsq1&U6i$Yy!R+-2)| zIep5ni`DEVFPDL!X5f*wD7lr#huY1{`!HK%w-0%^Tx8{AxMtsUURZAsMqw0TTvNsW iQpG=1@s?`%dUXl(hDNx}#smWV{vcA%pHe2{k^2I!Wi|=` literal 0 HcmV?d00001 diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index 18615d9b9b908..c7b6a50f0ae7c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -56,6 +56,11 @@ object SQLDataSourceExample { .option("header", "true") .load("examples/src/main/resources/people.csv") // $example off:manual_load_options_csv$ + // $example on:load_with_path_glob_filter$ + val partitionedUsersDF = spark.read.format("orc") + .option("pathGlobFilter", "*.orc") + .load("examples/src/main/resources/partitioned_users.orc") + // $example off:load_with_path_glob_filter$ // $example on:manual_save_options_orc$ usersDF.write.format("orc") .option("orc.bloom.filter.columns", "favorite_color") From dcab4f9d2c4628fb5544ae02fe73e8b8cb582ab8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 6 May 2019 16:28:02 -0700 Subject: [PATCH 4/8] deprecated Avro option: ignoreExtension --- .../scala/org/apache/spark/sql/avro/AvroFileFormat.scala | 4 ++++ .../main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index c2a7f31759439..be8223ccc9df0 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -57,6 +57,10 @@ private[avro] class AvroFileFormat extends FileFormat options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sessionState.newHadoopConf() + if (options.contains("ignoreExtension")) { + logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " + + "general data source option pathGlobFilter for filtering file names.") + } val parsedOptions = new AvroOptions(options, conf) // User can specify an optional avro json schema. diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index fec17bfff5424..338244aa9e53b 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -59,6 +59,7 @@ class AvroOptions( * If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension` * is taken into account. If the former one is not set too, file extensions are ignored. */ + @deprecated("Use the general data source option pathGlobFilter for filtering file names", "3.0") val ignoreExtension: Boolean = { val ignoreFilesWithoutExtensionByDefault = false val ignoreFilesWithoutExtension = conf.getBoolean( @@ -66,7 +67,7 @@ class AvroOptions( ignoreFilesWithoutExtensionByDefault) parameters - .get("ignoreExtension") + .get(AvroOptions.ignoreExtensionKey) .map(_.toBoolean) .getOrElse(!ignoreFilesWithoutExtension) } @@ -93,4 +94,6 @@ object AvroOptions { .getOrElse(new Configuration()) new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf) } + + val ignoreExtensionKey = "ignoreExtension" } From 4d2c6e1f06f4b86bbe30dea6988d42905a0a57a3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 6 May 2019 16:37:23 -0700 Subject: [PATCH 5/8] remove one empty line --- docs/sql-data-sources-load-save-functions.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/sql-data-sources-load-save-functions.md b/docs/sql-data-sources-load-save-functions.md index 5021a9990ead7..0227d4874c345 100644 --- a/docs/sql-data-sources-load-save-functions.md +++ b/docs/sql-data-sources-load-save-functions.md @@ -120,7 +120,6 @@ you can use:
{% include_example load_with_path_glob_filter r/RSparkSQLExample.R %} -
From ddf18742321a5d1f48f0f1857503a5e1eac5c96f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 6 May 2019 17:14:15 -0700 Subject: [PATCH 6/8] revise --- docs/sql-data-sources-binaryFile.md | 12 +++++++----- docs/sql-data-sources-load-save-functions.md | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/sql-data-sources-binaryFile.md b/docs/sql-data-sources-binaryFile.md index 65b398208929c..0d41c9e441c61 100644 --- a/docs/sql-data-sources-binaryFile.md +++ b/docs/sql-data-sources-binaryFile.md @@ -29,13 +29,15 @@ It produces a DataFrame with the following columns and possibly partition column * `content`: BinaryType To read whole binary files, you need to specify the data source `format` as `binaryFile`. -For example, the following code reads all the files from the input directory: +To load files with paths matching a given glob pattern while keeping the behavior of partition discovery, +you can use the general data source option `pathGlobFilter`. +For example, the following code reads all PNG files from the input directory:
{% highlight scala %} -spark.read.format("binaryFile").load("/path/to/data") +spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data") {% endhighlight %}
@@ -43,21 +45,21 @@ spark.read.format("binaryFile").load("/path/to/data")
{% highlight java %} -spark.read().format("binaryFile").load("/path/to/data"); +spark.read().format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data"); {% endhighlight %}
{% highlight python %} -spark.read.format("binaryFile").load("/path/to/data") +spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load("/path/to/data") {% endhighlight %}
{% highlight r %} -read.df("/path/to/data", source = "binaryFile") +read.df("/path/to/data", source = "binaryFile", pathGlobFilter = "*.png") {% endhighlight %}
diff --git a/docs/sql-data-sources-load-save-functions.md b/docs/sql-data-sources-load-save-functions.md index 0227d4874c345..07482137a28a3 100644 --- a/docs/sql-data-sources-load-save-functions.md +++ b/docs/sql-data-sources-load-save-functions.md @@ -102,7 +102,7 @@ To load a CSV file you can use:
-To load files with paths matching a given glob pattern recursively while keeping the behavior of partition discovery, +To load files with paths matching a given glob pattern while keeping the behavior of partition discovery, you can use:
From 5d678e291f574b463f81f19a848ed381248a0bf6 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 7 May 2019 11:01:31 -0700 Subject: [PATCH 7/8] address comment --- python/pyspark/sql/readwriter.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 8ca610094f3ff..6413d88d1dcfd 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -630,9 +630,6 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching - the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. - It does not change the behavior of partition discovery. """ self._jwrite = self._jwrite.option(key, to_str(value)) return self @@ -645,9 +642,6 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching - the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. - It does not change the behavior of partition discovery. """ for k in options: self._jwrite = self._jwrite.option(k, to_str(options[k])) From d8f8420d9d3c97f96c1e09855e008ece3f275ad3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 7 May 2019 11:03:34 -0700 Subject: [PATCH 8/8] address comment --- python/pyspark/sql/streaming.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 476ed003b9db4..b100cd1acd367 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -775,9 +775,6 @@ def option(self, key, value): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching - the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. - It does not change the behavior of partition discovery. .. note:: Evolving. """ @@ -792,9 +789,6 @@ def options(self, **options): * ``timeZone``: sets the string that indicates a timezone to be used to format timestamps in the JSON/CSV datasources or partition values. If it isn't set, it uses the default value, session local timezone. - * ``pathGlobFilter``: an optional glob pattern to only include files with paths matching - the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. - It does not change the behavior of partition discovery. .. note:: Evolving. """