From 432d8dd39b151e7c1acd1e2178ac16fa2ad52b92 Mon Sep 17 00:00:00 2001 From: itholic Date: Wed, 14 Apr 2021 15:31:45 +0900 Subject: [PATCH 01/18] [SPARK-35025] Move Parquet data source options from Python and Scala into a single page. --- docs/sql-data-sources-parquet.md | 18 ++++++ python/pyspark/sql/readwriter.py | 56 +++---------------- .../apache/spark/sql/DataFrameReader.scala | 43 +------------- .../apache/spark/sql/DataFrameWriter.scala | 10 +--- 4 files changed, 33 insertions(+), 94 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index 119eae56ebf74..713fea35f5510 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -286,6 +286,24 @@ Data source options of Parquet can be set via: read + + mergeSchema + The SQL config spark.sql.parquet.mergeSchema which is false by default. + Sets whether we should merge schemas collected from all Parquet part-files. This will override spark.sql.parquet.mergeSchema. + read + + + compression + None + Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, uncompressed, snappy, gzip, lzo, brotli, lz4, and zstd). This will override spark.sql.parquet.compression.codec. If None is set, it uses the value specified in spark.sql.parquet.compression.codec. + write + + + partitionBy + None + Names of partitioning columns + write + ### Configuration diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 9a65b6fa760ed..6071a4abc52c7 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -416,53 +416,10 @@ def parquet(self, *paths, **options): Other Parameters ---------------- - mergeSchema : str or bool, optional - sets whether we should merge schemas collected from all - Parquet part-files. This will override - ``spark.sql.parquet.mergeSchema``. The default value is specified in - ``spark.sql.parquet.mergeSchema``. - pathGlobFilter : str or bool, optional - an optional glob pattern to only include files with paths matching - the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`. - It does not change the behavior of - `partition discovery `_. # noqa - recursiveFileLookup : str or bool, optional - recursively scan a directory for files. Using this option - disables - `partition discovery `_. # noqa - - modification times occurring before the specified time. The provided timestamp - must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) - modifiedBefore (batch only) : an optional timestamp to only include files with - modification times occurring before the specified time. The provided timestamp - must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) - modifiedAfter (batch only) : an optional timestamp to only include files with - modification times occurring after the specified time. The provided timestamp - must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) - datetimeRebaseMode : str, optional - the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``, - ``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar. - - * ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps - that are ambiguous between the two calendars. - * ``CORRECTED``: loading of dates/timestamps without rebasing. - * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian - to Proleptic Gregorian calendar. - - If None is set, the value of the SQL config - ``spark.sql.parquet.datetimeRebaseModeInRead`` is used by default. - int96RebaseMode : str, optional - the rebasing mode for ``INT96`` timestamps from the Julian to - Proleptic Gregorian calendar. - - * ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps - that are ambiguous between the two calendars. - * ``CORRECTED``: loading of ``INT96`` timestamps without rebasing. - * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian - to Proleptic Gregorian calendar. - - If None is set, the value of the SQL config - ``spark.sql.parquet.int96RebaseModeInRead`` is used by default. + options** + Please refer to + `Data Source Option `_ + for the options. Examples -------- @@ -1265,6 +1222,11 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): ``spark.sql.parquet.compression.codec``. If None is set, it uses the value specified in ``spark.sql.parquet.compression.codec``. + Other Parameters + ---------------- + Extra options (keyword argument) + `Data Source Option `_. + Examples -------- >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 9ff37a6f1c69c..adec377319301 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -812,46 +812,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Loads a Parquet file, returning the result as a `DataFrame`. * - * You can set the following Parquet-specific option(s) for reading Parquet files: - *
    - *
  • `mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets - * whether we should merge schemas collected from all Parquet part-files. This will override - * `spark.sql.parquet.mergeSchema`.
  • - *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching - * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. - * It does not change the behavior of partition discovery.
  • - *
  • `modifiedBefore` (batch only): an optional timestamp to only include files with - * modification times occurring before the specified Time. The provided timestamp - * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • - *
  • `modifiedAfter` (batch only): an optional timestamp to only include files with - * modification times occurring after the specified Time. The provided timestamp - * must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
  • - *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option - * disables partition discovery
  • - *
  • `datetimeRebaseMode` (default is the value specified in the SQL config - * `spark.sql.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values - * of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to - * Proleptic Gregorian calendar: - *
      - *
    • `EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous - * between the two calendars
    • - *
    • `CORRECTED` : loading of dates/timestamps without rebasing
    • - *
    • `LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic - * Gregorian calendar
    • - *
    - *
  • - *
  • `int96RebaseMode` (default is the value specified in the SQL config - * `spark.sql.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps - * from the Julian to Proleptic Gregorian calendar: - *
      - *
    • `EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous - * between the two calendars
    • - *
    • `CORRECTED` : loading of timestamps without rebasing
    • - *
    • `LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic - * Gregorian calendar
    • - *
    - *
  • - *
+ * Parquet-specific option(s) for reading Parquet files can be found in + * + * Data Source Option. * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index fe6572cff5de2..a731a6b387d4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -860,13 +860,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * format("parquet").save(path) * }}} * - * You can set the following Parquet-specific option(s) for writing Parquet files: - *
    - *
  • `compression` (default is the value specified in `spark.sql.parquet.compression.codec`): - * compression codec to use when saving to file. This can be one of the known case-insensitive - * shorten names(`none`, `uncompressed`, `snappy`, `gzip`, `lzo`, `brotli`, `lz4`, and `zstd`). - * This will override `spark.sql.parquet.compression.codec`.
  • - *
+ * Parquet-specific option(s) for writing Parquet files can be found in + * + * Data Source Option. * * @since 1.4.0 */ From b7aa8c7783c6225d32f602ebb7ef5bfdb7f7cbb7 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 16 Apr 2021 16:01:56 +0900 Subject: [PATCH 02/18] Resolve comments --- docs/sql-data-sources-parquet.md | 1 + python/pyspark/sql/readwriter.py | 8 ++++++-- python/pyspark/sql/streaming.py | 7 +++++++ .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 5 ++++- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 5 ++++- 5 files changed, 22 insertions(+), 4 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index 713fea35f5510..fa39f7e84b6fa 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -305,6 +305,7 @@ Data source options of Parquet can be set via: write +Other generic options can be found in Generic Files Source Options ### Configuration diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 32fb073244fe2..651a03ae17d67 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -418,8 +418,10 @@ def parquet(self, *paths, **options): ---------------- options** Please refer to - `Data Source Option `_ + `Data Source Option `_ for the options. + Note that you should refer to the documentation of the specific version you're using + because the link above always points out the latest documentation. Examples -------- @@ -1226,7 +1228,9 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): Other Parameters ---------------- Extra options (keyword argument) - `Data Source Option `_. + `Data Source Option `_. + Note that you should refer to the documentation of the specific version you're using + because the link above always points out the latest documentation. Examples -------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 8b8484defb600..742cdabb8a291 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -714,6 +714,13 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook If None is set, the value of the SQL config ``spark.sql.parquet.int96RebaseModeInRead`` is used by default. + Other Parameters + ---------------- + Extra options (keyword argument) + `Data Source Option `_. + Note that you should refer to the documentation of the specific version you're using + because the link above always points out the latest documentation. + Examples -------- >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index adec377319301..c47e51c296174 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -813,8 +813,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Loads a Parquet file, returning the result as a `DataFrame`. * * Parquet-specific option(s) for reading Parquet files can be found in - * + * * Data Source Option. + * Note that you should refer to the documentation of the specific version you're using + * because the link above always points out the latest documentation. * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a731a6b387d4b..9e5f9d3806e63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -861,8 +861,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * }}} * * Parquet-specific option(s) for writing Parquet files can be found in - * + * * Data Source Option. + * Note that you should refer to the documentation of the specific version you're using + * because the link above always points out the latest documentation. * * @since 1.4.0 */ From 082d86dd47c6cbff06db1d4b3ad85ded7abac156 Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 19 Apr 2021 12:01:33 +0900 Subject: [PATCH 03/18] Addressed comments --- python/pyspark/sql/readwriter.py | 12 +++++------- python/pyspark/sql/streaming.py | 6 +++--- .../scala/org/apache/spark/sql/DataFrameReader.scala | 4 +--- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 4 +--- 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 651a03ae17d67..62bf0428b4965 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -417,11 +417,9 @@ def parquet(self, *paths, **options): Other Parameters ---------------- options** - Please refer to + For the extra options, refer to `Data Source Option `_ - for the options. - Note that you should refer to the documentation of the specific version you're using - because the link above always points out the latest documentation. + in the version you use. Examples -------- @@ -1228,9 +1226,9 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): Other Parameters ---------------- Extra options (keyword argument) - `Data Source Option `_. - Note that you should refer to the documentation of the specific version you're using - because the link above always points out the latest documentation. + For the extra options, refer to + `Data Source Option `_ + in the version you use. Examples -------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 742cdabb8a291..634f837d7a714 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -717,9 +717,9 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook Other Parameters ---------------- Extra options (keyword argument) - `Data Source Option `_. - Note that you should refer to the documentation of the specific version you're using - because the link above always points out the latest documentation. + For the extra options, refer to + `Data Source Option `_ + in the version you use. Examples -------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c47e51c296174..4ae90b87374ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -815,9 +815,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Parquet-specific option(s) for reading Parquet files can be found in * - * Data Source Option. - * Note that you should refer to the documentation of the specific version you're using - * because the link above always points out the latest documentation. + * Data Source Option in the version you use. * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9e5f9d3806e63..57e6824c2f8d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -863,9 +863,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Parquet-specific option(s) for writing Parquet files can be found in * - * Data Source Option. - * Note that you should refer to the documentation of the specific version you're using - * because the link above always points out the latest documentation. + * Data Source Option in the version you use. * * @since 1.4.0 */ From 53ef3ea4d34e082a9b446152c2c06fab516565dc Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 19 Apr 2021 13:01:01 +0900 Subject: [PATCH 04/18] Fix Python linter failuer --- python/pyspark/sql/readwriter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 62bf0428b4965..998f1857fa639 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -418,7 +418,7 @@ def parquet(self, *paths, **options): ---------------- options** For the extra options, refer to - `Data Source Option `_ + `Data Source Option `_ # noqa in the version you use. Examples @@ -1227,7 +1227,7 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): ---------------- Extra options (keyword argument) For the extra options, refer to - `Data Source Option `_ + `Data Source Option `_ # noqa in the version you use. Examples From 3c3e51863a0a062f0d98f9b09aee46c982b58b1f Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 29 Apr 2021 12:11:43 +0900 Subject: [PATCH 05/18] Remove the duplicated options from python docstring --- python/pyspark/sql/readwriter.py | 12 ++---------- python/pyspark/sql/streaming.py | 18 ++---------------- 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 998f1857fa639..79105d80ca3f2 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -1197,7 +1197,7 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm lineSep=lineSep, encoding=encoding, ignoreNullFields=ignoreNullFields) self._jwrite.json(path) - def parquet(self, path, mode=None, partitionBy=None, compression=None): + def parquet(self, path, mode=None): """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. .. versionadded:: 1.4.0 @@ -1214,18 +1214,10 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ exists. - partitionBy : str or list, optional - names of partitioning columns - compression : str, optional - compression codec to use when saving to file. This can be one of the - known case-insensitive shorten names (none, uncompressed, snappy, gzip, - lzo, brotli, lz4, and zstd). This will override - ``spark.sql.parquet.compression.codec``. If None is set, it uses the - value specified in ``spark.sql.parquet.compression.codec``. Other Parameters ---------------- - Extra options (keyword argument) + Extra options For the extra options, refer to `Data Source Option `_ # noqa in the version you use. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 634f837d7a714..46031c7900776 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -667,8 +667,7 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N else: raise TypeError("path can be only a single string") - def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, - datetimeRebaseMode=None, int96RebaseMode=None): + def parquet(self, path, datetimeRebaseMode=None, int96RebaseMode=None): """ Loads a Parquet file stream, returning the result as a :class:`DataFrame`. @@ -676,19 +675,6 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook Parameters ---------- - mergeSchema : str or bool, optional - sets whether we should merge schemas collected from all - Parquet part-files. This will override - ``spark.sql.parquet.mergeSchema``. The default value is specified in - ``spark.sql.parquet.mergeSchema``. - pathGlobFilter : str or bool, optional - an optional glob pattern to only include files with paths matching - the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`. - It does not change the behavior of `partition discovery`_. - recursiveFileLookup : str or bool, optional - recursively scan a directory for files. Using this option - disables - `partition discovery `_. # noqa datetimeRebaseMode : str, optional the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``, ``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar. @@ -716,7 +702,7 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook Other Parameters ---------------- - Extra options (keyword argument) + Extra options For the extra options, refer to `Data Source Option `_ in the version you use. From a5648c58e1e26a9f51169f7b0b69c2a74860046e Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 29 Apr 2021 12:24:54 +0900 Subject: [PATCH 06/18] Remove more duplicated options --- python/pyspark/sql/streaming.py | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 46031c7900776..a64c0b9381f64 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -667,7 +667,7 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N else: raise TypeError("path can be only a single string") - def parquet(self, path, datetimeRebaseMode=None, int96RebaseMode=None): + def parquet(self, path): """ Loads a Parquet file stream, returning the result as a :class:`DataFrame`. @@ -675,30 +675,8 @@ def parquet(self, path, datetimeRebaseMode=None, int96RebaseMode=None): Parameters ---------- - datetimeRebaseMode : str, optional - the rebasing mode for the values of the ``DATE``, ``TIMESTAMP_MICROS``, - ``TIMESTAMP_MILLIS`` logical types from the Julian to Proleptic Gregorian calendar. - - * ``EXCEPTION``: Spark fails in reads of ancient dates/timestamps - that are ambiguous between the two calendars. - * ``CORRECTED``: loading of dates/timestamps without rebasing. - * ``LEGACY``: perform rebasing of ancient dates/timestamps from the Julian - to Proleptic Gregorian calendar. - - If None is set, the value of the SQL config - ``spark.sql.parquet.datetimeRebaseModeInRead`` is used by default. - int96RebaseMode : str, optional - the rebasing mode for ``INT96`` timestamps from the Julian to - Proleptic Gregorian calendar. - - * ``EXCEPTION``: Spark fails in reads of ancient ``INT96`` timestamps - that are ambiguous between the two calendars. - * ``CORRECTED``: loading of ``INT96`` timestamps without rebasing. - * ``LEGACY``: perform rebasing of ancient ``INT96`` timestamps from the Julian - to Proleptic Gregorian calendar. - - If None is set, the value of the SQL config - ``spark.sql.parquet.int96RebaseModeInRead`` is used by default. + path : str + the path in any Hadoop supported file system Other Parameters ---------------- From 5077c692ac80901dd4f95df24ec7c64a6eb5be4a Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 11 May 2021 08:57:28 +0900 Subject: [PATCH 07/18] Revert the removal of keyword arguments --- python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/streaming.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index aa20e41c7b7a1..8b4e8408a2e0b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -1197,7 +1197,7 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm lineSep=lineSep, encoding=encoding, ignoreNullFields=ignoreNullFields) self._jwrite.json(path) - def parquet(self, path, mode=None): + def parquet(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. .. versionadded:: 1.4.0 diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 028752aa1c0ba..363e7655b9e65 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -667,7 +667,8 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N else: raise TypeError("path can be only a single string") - def parquet(self, path): + def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, + datetimeRebaseMode=None, int96RebaseMode=None): """ Loads a Parquet file stream, returning the result as a :class:`DataFrame`. From bb5cd4529b07b05b21cdaf878b06b61ad717be79 Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 13 May 2021 13:44:54 +0900 Subject: [PATCH 08/18] Resolved comments --- python/pyspark/sql/readwriter.py | 4 ++++ python/pyspark/sql/streaming.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 8b4e8408a2e0b..b0420c97b605a 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -420,6 +420,8 @@ def parquet(self, *paths, **options): For the extra options, refer to `Data Source Option `_ # noqa in the version you use. + Other generic options can be found in + `Generic File Source Options _ # noqa Examples -------- @@ -1221,6 +1223,8 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): For the extra options, refer to `Data Source Option `_ # noqa in the version you use. + Other generic options can be found in + `Generic File Source Options _ # noqa Examples -------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 363e7655b9e65..99db7daf3c104 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -685,6 +685,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook For the extra options, refer to `Data Source Option `_ in the version you use. + Other generic options can be found in + `Generic File Source Options _ # noqa Examples -------- From 3ecc1964a8bc474d5414019458dd1c499a6a9ef8 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 14 May 2021 09:30:48 +0900 Subject: [PATCH 09/18] Add missing Generic Options link --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 38495009d11c7..54c4006cdee9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -816,6 +816,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * Data Source Option in the version you use. + * Other generic options can be found in + * + * Generic Files Source Options. * * @since 1.4.0 */ From 883aacf94044c1b32648d6dab0ed6fc10048b580 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 14 May 2021 13:43:05 +0900 Subject: [PATCH 10/18] Retore the patrtitionBy --- docs/sql-data-sources-parquet.md | 6 ------ python/pyspark/sql/readwriter.py | 2 ++ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index fa39f7e84b6fa..91265410bfefc 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -298,12 +298,6 @@ Data source options of Parquet can be set via: Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, uncompressed, snappy, gzip, lzo, brotli, lz4, and zstd). This will override spark.sql.parquet.compression.codec. If None is set, it uses the value specified in spark.sql.parquet.compression.codec. write - - partitionBy - None - Names of partitioning columns - write - Other generic options can be found in Generic Files Source Options diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index b0420c97b605a..82c58f0708c93 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -1216,6 +1216,8 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \ exists. + partitionBy : str or list, optional + names of partitioning columns Other Parameters ---------------- From 9c4782fc2896dd94f5735775121b0db57d495480 Mon Sep 17 00:00:00 2001 From: itholic Date: Mon, 17 May 2021 21:17:25 +0900 Subject: [PATCH 11/18] Resolved comments --- python/pyspark/sql/readwriter.py | 10 +++++----- python/pyspark/sql/streaming.py | 4 ++-- .../scala/org/apache/spark/sql/DataFrameReader.scala | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 82c58f0708c93..305edbc9560be 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -416,12 +416,12 @@ def parquet(self, *paths, **options): Other Parameters ---------------- - options** + **options For the extra options, refer to `Data Source Option `_ # noqa - in the version you use. - Other generic options can be found in + and `Generic File Source Options _ # noqa + in the version you use. Examples -------- @@ -1224,9 +1224,9 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): Extra options For the extra options, refer to `Data Source Option `_ # noqa - in the version you use. - Other generic options can be found in + and `Generic File Source Options _ # noqa + in the version you use. Examples -------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 99db7daf3c104..9e6b0698aaf23 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -684,9 +684,9 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook Extra options For the extra options, refer to `Data Source Option `_ - in the version you use. - Other generic options can be found in + and `Generic File Source Options _ # noqa + in the version you use. Examples -------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 54c4006cdee9e..77a0866cf2a08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -815,11 +815,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Parquet-specific option(s) for reading Parquet files can be found in * - * Data Source Option in the version you use. - * Other generic options can be found in + * Data Source Option + * and * - * Generic Files Source Options. + * Generic Files Source Options in the version you use. * * @since 1.4.0 */ From ad9f8a01a7823f01a6485ade88bf58f42d71020a Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 18 May 2021 19:24:49 +0900 Subject: [PATCH 12/18] Update DataStreamReader --- docs/sql-data-sources-parquet.md | 6 +++ .../sql/streaming/DataStreamReader.scala | 45 ++++--------------- 2 files changed, 14 insertions(+), 37 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index 91265410bfefc..4d4abcaa323dc 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -260,6 +260,12 @@ Data source options of Parquet can be set via: + + + + + + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 1798f6e2c88bd..109bad6b88503 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -476,43 +476,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo /** * Loads a Parquet file stream, returning the result as a `DataFrame`. * - * You can set the following Parquet-specific option(s) for reading Parquet files: - *
    - *
  • `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be - * considered in every trigger.
  • - *
  • `mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets - * whether we should merge schemas collected from all - * Parquet part-files. This will override - * `spark.sql.parquet.mergeSchema`.
  • - *
  • `pathGlobFilter`: an optional glob pattern to only include files with paths matching - * the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. - * It does not change the behavior of partition discovery.
  • - *
  • `recursiveFileLookup`: recursively scan a directory for files. Using this option - * disables partition discovery
  • - *
  • `datetimeRebaseMode` (default is the value specified in the SQL config - * `spark.sql.parquet.datetimeRebaseModeInRead`): the rebasing mode for the values - * of the `DATE`, `TIMESTAMP_MICROS`, `TIMESTAMP_MILLIS` logical types from the Julian to - * Proleptic Gregorian calendar: - *
      - *
    • `EXCEPTION` : Spark fails in reads of ancient dates/timestamps that are ambiguous - * between the two calendars
    • - *
    • `CORRECTED` : loading of dates/timestamps without rebasing
    • - *
    • `LEGACY` : perform rebasing of ancient dates/timestamps from the Julian to Proleptic - * Gregorian calendar
    • - *
    - *
  • - *
  • `int96RebaseMode` (default is the value specified in the SQL config - * `spark.sql.parquet.int96RebaseModeInRead`): the rebasing mode for `INT96` timestamps - * from the Julian to Proleptic Gregorian calendar: - *
      - *
    • `EXCEPTION` : Spark fails in reads of ancient `INT96` timestamps that are ambiguous - * between the two calendars
    • - *
    • `CORRECTED` : loading of timestamps without rebasing
    • - *
    • `LEGACY` : perform rebasing of ancient `INT96` timestamps from the Julian to Proleptic - * Gregorian calendar
    • - *
    - *
  • - *
+ * Parquet-specific option(s) for reading Parquet file stream can be found in + * + * Data Source Option + * and + * + * Generic Files Source Options in the version you use. * * @since 2.0.0 */ From 45e0f8f20ab231172cbaf17f58ec62b6f2e51e26 Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 20 May 2021 17:12:03 +0900 Subject: [PATCH 13/18] Resolved comments --- docs/sql-data-sources-parquet.md | 6 ------ .../scala/org/apache/spark/sql/DataFrameReader.scala | 4 ++-- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 8 ++++++-- .../apache/spark/sql/streaming/DataStreamReader.scala | 10 ++++++++-- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index 4d4abcaa323dc..91265410bfefc 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -260,12 +260,6 @@ Data source options of Parquet can be set via:
Property NameDefaultMeaningScope
maxFilesPerTriggerNoneSets the maximum number of new files to be considered in every trigger.read
datetimeRebaseMode The SQL config spark.sql.parquet .datetimeRebaseModeInRead which is EXCEPTION by default
- - - - - - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 77a0866cf2a08..b30af89165fbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -815,8 +815,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * Parquet-specific option(s) for reading Parquet files can be found in * - * Data Source Option - * and + * Data Source Option in the version you use. + * More general options can be found in * * Generic Files Source Options in the version you use. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 57e6824c2f8d8..4f55afaef678c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -860,10 +860,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * format("parquet").save(path) * }}} * - * Parquet-specific option(s) for writing Parquet files can be found in + * Parquet-specific option(s) for reading Parquet files can be found in * - * Data Source Option in the version you use. + * Data Source Option + * and + * + * Generic Files Source Options in the version you use. * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 109bad6b88503..6f105734b8810 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -476,11 +476,17 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo /** * Loads a Parquet file stream, returning the result as a `DataFrame`. * + * You can set the following structured streaming option(s): + *
    + *
  • `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be + * considered in every trigger.
  • + *
+ * * Parquet-specific option(s) for reading Parquet file stream can be found in * - * Data Source Option - * and + * Data Source Option in the version you use. + * More general options can be found in * * Generic Files Source Options in the version you use. From ffc124cfd1263f07f0c39783f8ff9538c70b7a5d Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 20 May 2021 17:16:55 +0900 Subject: [PATCH 14/18] One more fix --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 4f55afaef678c..adfbe7384743f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -863,8 +863,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * Parquet-specific option(s) for reading Parquet files can be found in * - * Data Source Option - * and + * Data Source Option in the version you use. + * More general options can be found in * * Generic Files Source Options in the version you use. From 41ad66e2e99bec2ad67bf7f03e0b6511c0538d63 Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 20 May 2021 20:47:30 +0900 Subject: [PATCH 15/18] itemize the options --- docs/sql-data-sources-parquet.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md index 91265410bfefc..de278cf9f9241 100644 --- a/docs/sql-data-sources-parquet.md +++ b/docs/sql-data-sources-parquet.md @@ -255,8 +255,11 @@ REFRESH TABLE my_table; ## Data Source Option Data source options of Parquet can be set via: -* the `.option`/`.options` methods of `DataFrameReader` or `DataFrameWriter` -* the `.option`/`.options` methods of `DataStreamReader` or `DataStreamWriter` +* the `.option`/`.options` methods of + * `DataFrameReader` + * `DataFrameWriter` + * `DataStreamReader` + * `DataStreamWriter`
Property NameDefaultMeaningScope
maxFilesPerTriggerNoneSets the maximum number of new files to be considered in every trigger.read
datetimeRebaseMode The SQL config spark.sql.parquet .datetimeRebaseModeInRead which is EXCEPTION by default
From 2272717385abf4e7534a5c9dc39a8d1d8f26d6da Mon Sep 17 00:00:00 2001 From: itholic Date: Thu, 20 May 2021 20:53:57 +0900 Subject: [PATCH 16/18] Resolved comments --- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 4 ---- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ---- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 6 +----- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b30af89165fbc..38495009d11c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -816,10 +816,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * Data Source Option in the version you use. - * More general options can be found in - * - * Generic Files Source Options in the version you use. * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index adfbe7384743f..cabea46be9217 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -864,10 +864,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * * Data Source Option in the version you use. - * More general options can be found in - * - * Generic Files Source Options in the version you use. * * @since 1.4.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 6f105734b8810..9f5de11f26c22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -476,7 +476,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo /** * Loads a Parquet file stream, returning the result as a `DataFrame`. * - * You can set the following structured streaming option(s): + * You can set the following option(s): *
    *
  • `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.
  • @@ -486,10 +486,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * Data Source Option in the version you use. - * More general options can be found in - * - * Generic Files Source Options in the version you use. * * @since 2.0.0 */ From ead523de8cc13c8165e98a1b240bf17d782a2b66 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 21 May 2021 10:23:19 +0900 Subject: [PATCH 17/18] Resolved comments --- python/pyspark/sql/readwriter.py | 4 ---- python/pyspark/sql/streaming.py | 2 -- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 305edbc9560be..59ab752001a4b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -419,8 +419,6 @@ def parquet(self, *paths, **options): **options For the extra options, refer to `Data Source Option `_ # noqa - and - `Generic File Source Options _ # noqa in the version you use. Examples @@ -1224,8 +1222,6 @@ def parquet(self, path, mode=None, partitionBy=None, compression=None): Extra options For the extra options, refer to `Data Source Option `_ # noqa - and - `Generic File Source Options _ # noqa in the version you use. Examples diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 9e6b0698aaf23..363e7655b9e65 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -684,8 +684,6 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook Extra options For the extra options, refer to `Data Source Option `_ - and - `Generic File Source Options _ # noqa in the version you use. Examples diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index cabea46be9217..57e6824c2f8d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -860,7 +860,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * format("parquet").save(path) * }}} * - * Parquet-specific option(s) for reading Parquet files can be found in + * Parquet-specific option(s) for writing Parquet files can be found in * * Data Source Option in the version you use. From d6417a8124eb61390089313d108eff18fd89e412 Mon Sep 17 00:00:00 2001 From: itholic Date: Fri, 21 May 2021 12:09:39 +0900 Subject: [PATCH 18/18] Add noqa --- python/pyspark/sql/streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 363e7655b9e65..94a022dfe5e18 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -683,7 +683,7 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook ---------------- Extra options For the extra options, refer to - `Data Source Option `_ + `Data Source Option `_. # noqa in the version you use. Examples
Property NameDefaultMeaningScope