From f3ccfec01851079393521884c3c5df1d0cc92644 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 25 Aug 2017 12:13:54 -0700 Subject: [PATCH 01/10] [SPARK-21839][SQL] Support SQL config for ORC compression --- .../apache/spark/sql/internal/SQLConf.scala | 11 +++++++++ .../spark/sql/hive/orc/OrcFileFormat.scala | 4 ++-- .../spark/sql/hive/orc/OrcOptions.scala | 15 ++++++++---- .../spark/sql/hive/orc/OrcSourceSuite.scala | 24 +++++++++++++++++-- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a685099505ee8..3dc1ec9c6ec78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -322,6 +322,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + // We can add LZO after Apache ORC 1.4.0 is used. + val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") + .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " + + "uncompressed, snappy, gzip.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set("uncompressed", "snappy", "zlib")) + .createWithDefault("snappy") + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf @@ -998,6 +1007,8 @@ class SQLConf extends Serializable with Logging { def useCompression: Boolean = getConf(COMPRESS_CACHED) + def orcCompressionCodec: String = getConf(ORC_COMPRESSION) + def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 3a34ec55c8b07..228c4c3cf6df3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -68,11 +68,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val orcOptions = new OrcOptions(options) + val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) val configuration = job.getConfiguration - configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) + configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodecClassName) configuration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index 043eb69818ba1..321f95f3b4f0c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -20,22 +20,26 @@ package org.apache.spark.sql.hive.orc import java.util.Locale import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.internal.SQLConf /** * Options for the ORC data source. */ -private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap[String]) +private[orc] class OrcOptions( + @transient private val parameters: CaseInsensitiveMap[String], + @transient private val sqlConf: SQLConf) extends Serializable { import OrcOptions._ - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], sqlConf: SQLConf) = + this(CaseInsensitiveMap(parameters), sqlConf) /** - * Compression codec to use. By default snappy compression. + * Compression codec to use. By default use the value specified in SQLConf. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ - val compressionCodec: String = { + val compressionCodecClassName: String = { // `orc.compress` is a ORC configuration. So, here we respect this as an option but // `compression` has higher precedence than `orc.compress`. It means if both are set, // we will use `compression`. @@ -43,7 +47,8 @@ private[orc] class OrcOptions(@transient private val parameters: CaseInsensitive val codecName = parameters .get("compression") .orElse(orcCompressionConf) - .getOrElse("snappy").toLowerCase(Locale.ROOT) + .getOrElse(sqlConf.orcCompressionCodec) + .toLowerCase(Locale.ROOT) if (!shortOrcCompressionCodecNames.contains(codecName)) { val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT)) throw new IllegalArgumentException(s"Codec [$codecName] " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 52fa401d32c18..884e61c94ad6a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -18,12 +18,13 @@ package org.apache.spark.sql.hive.orc import java.io.File +import java.util.Locale import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -149,7 +150,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE") + val conf = sqlContext.sessionState.conf + assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodecClassName == "NONE") } test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { @@ -194,6 +196,24 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA Utils.deleteRecursively(location) } } + + test("SPARK-21839: Add SQL config for ORC compression") { + val conf = sqlContext.sessionState.conf + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "SNAPPY") + + // OrcOptions's parameters have a higher priority than SQL configuration. + withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "NONE") + assert( + new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodecClassName == "ZLIB") + } + + Seq("SNAPPY", "ZLIB").foreach { c => + withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == c) + } + } + } } class OrcSourceSuite extends OrcSuite { From 5998c296407a677b0cc7a810802c9b8dfb171b53 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 25 Aug 2017 12:36:09 -0700 Subject: [PATCH 02/10] fix doc. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3dc1ec9c6ec78..aa7575cdf2304 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -325,7 +325,7 @@ object SQLConf { // We can add LZO after Apache ORC 1.4.0 is used. val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " + - "uncompressed, snappy, gzip.") + "uncompressed, snappy, zlib.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues(Set("uncompressed", "snappy", "zlib")) From afbb6f20526681e2201c3dc90e71589b51cd9034 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 28 Aug 2017 09:35:16 -0700 Subject: [PATCH 03/10] address comments. --- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 +- .../org/apache/spark/sql/hive/orc/OrcOptions.scala | 8 +++----- .../apache/spark/sql/hive/orc/OrcSourceSuite.scala | 12 ++++++------ 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index aa7575cdf2304..4785a2ab76ebb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -328,7 +328,7 @@ object SQLConf { "uncompressed, snappy, zlib.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) - .checkValues(Set("uncompressed", "snappy", "zlib")) + .checkValues(Set("none", "uncompressed", "snappy", "zlib")) .createWithDefault("snappy") val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 228c4c3cf6df3..edf2013a4c936 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -72,7 +72,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val configuration = job.getConfiguration - configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodecClassName) + configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) configuration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index 321f95f3b4f0c..a7d40ae2f13f2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -36,13 +36,11 @@ private[orc] class OrcOptions( this(CaseInsensitiveMap(parameters), sqlConf) /** - * Compression codec to use. By default use the value specified in SQLConf. + * Compression codec to use. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ - val compressionCodecClassName: String = { - // `orc.compress` is a ORC configuration. So, here we respect this as an option but - // `compression` has higher precedence than `orc.compress`. It means if both are set, - // we will use `compression`. + val compressionCodec: String = { + // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` is used in order. val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION) val codecName = parameters .get("compression") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 884e61c94ad6a..8150bfc5e69c8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -151,7 +151,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val conf = sqlContext.sessionState.conf - assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodecClassName == "NONE") + assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE") } test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { @@ -199,18 +199,18 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA test("SPARK-21839: Add SQL config for ORC compression") { val conf = sqlContext.sessionState.conf - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "SNAPPY") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") // OrcOptions's parameters have a higher priority than SQL configuration. withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == "NONE") + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") assert( - new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodecClassName == "ZLIB") + new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB") } - Seq("SNAPPY", "ZLIB").foreach { c => + Seq("NONE", "SNAPPY", "ZLIB").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodecClassName == c) + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == c) } } } From 437d181dd02a0468f293a3f0b5cb01dc9341e747 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 28 Aug 2017 22:00:35 -0700 Subject: [PATCH 04/10] add lzo. --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 5 ++--- .../scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4785a2ab76ebb..c407874381ac2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -322,13 +322,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - // We can add LZO after Apache ORC 1.4.0 is used. val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " + - "uncompressed, snappy, zlib.") + "none, uncompressed, snappy, zlib, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) - .checkValues(Set("none", "uncompressed", "snappy", "zlib")) + .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 8150bfc5e69c8..81385444fc1d8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -208,7 +208,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB") } - Seq("NONE", "SNAPPY", "ZLIB").foreach { c => + Seq("NONE", "SNAPPY", "ZLIB", "LZO").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == c) } From aafbfbbe2bdd45a639420105ab4b838c73de29ca Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 29 Aug 2017 09:14:43 -0700 Subject: [PATCH 05/10] Address comments. --- .../spark/sql/hive/orc/OrcSourceSuite.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 81385444fc1d8..4865700c0d128 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -199,18 +199,27 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA test("SPARK-21839: Add SQL config for ORC compression") { val conf = sqlContext.sessionState.conf + // Test if the default of spark.sql.orc.compression.codec is snappy assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") // OrcOptions's parameters have a higher priority than SQL configuration. + // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec` withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") - assert( - new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB") + val map1 = Map("orc.compress" -> "zlib") + val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo") + assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB") + assert(new OrcOptions(map2, conf).compressionCodec == "LZO") } - Seq("NONE", "SNAPPY", "ZLIB", "LZO").foreach { c => + // Test all the valid options of spark.sql.orc.compression.codec + Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == c) + if (c == "UNCOMPRESSED") { + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") + } else { + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == c) + } } } } From 8aebcd326f47a567ddc110281ed9e24c9fe266b2 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 29 Aug 2017 09:23:50 -0700 Subject: [PATCH 06/10] fix --- .../org/apache/spark/sql/hive/orc/OrcSourceSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 4865700c0d128..2bfd875098c3f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -215,11 +215,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA // Test all the valid options of spark.sql.orc.compression.codec Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { - if (c == "UNCOMPRESSED") { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") - } else { - assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == c) - } + val expected = if (c == "UNCOMPRESSED") "NONE" else c + assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected) } } } From 94e624ea090c4c3b541890b0aa137842cfe3a373 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 29 Aug 2017 19:28:20 -0700 Subject: [PATCH 07/10] remove unused import. --- .../scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 2bfd875098c3f..781de6631f324 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive.orc import java.io.File -import java.util.Locale import org.scalatest.BeforeAndAfterAll From 34e2845524dbecebd4c9fd1908cf4cacaafe9acd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 29 Aug 2017 19:58:39 -0700 Subject: [PATCH 08/10] update comments according to the review comment. --- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index cca93525d6792..07347d2748544 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * * You can set the following ORC-specific option(s) for writing ORC files: *
    - *
  • `compression` (default `snappy`): compression codec to use when saving to file. This can be - * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`). - * This will override `orc.compress`.
  • + *
  • `compression` (default is the value specified in `spark.sql.orc.compression.codec`): + * compression codec to use when saving to file. This can be one of the known case-insensitive + * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override + * `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given, + * it overrides `spark.sql.parquet.compression.codec`.
  • *
* * @since 1.5.0 From 4c0300c23f951ca1fc0369f0aa5fdef27700aa3a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 29 Aug 2017 20:26:00 -0700 Subject: [PATCH 09/10] Update comments in readwriter.py, too. --- python/pyspark/sql/readwriter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 01da0dc27d83d..cb847a0420311 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -851,8 +851,9 @@ def orc(self, path, mode=None, partitionBy=None, compression=None): :param partitionBy: names of partitioning columns :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, and lzo). - This will override ``orc.compress``. If None is set, it uses the - default value, ``snappy``. + This will override ``orc.compress`` and + ``spark.sql.orc.compression.codec``. If None is set, it uses the value + specified in ``spark.sql.orc.compression.codec``. >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) From 9620e46c9a38549f72565e606d892c84b6301ccb Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 30 Aug 2017 12:43:04 -0700 Subject: [PATCH 10/10] fix --- .../main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index a7d40ae2f13f2..7f94c8c579026 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -40,7 +40,8 @@ private[orc] class OrcOptions( * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { - // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` is used in order. + // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are + // in order of precedence from highest to lowest. val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION) val codecName = parameters .get("compression")