From 311482ab379446c631d642e594193d2e7f1ed442 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 21 Jul 2018 23:51:08 +0200 Subject: [PATCH 01/13] Adding compression and compression level options --- .../spark/sql/avro/AvroFileFormat.scala | 16 +++++------ .../apache/spark/sql/avro/AvroOptions.scala | 28 +++++++++++++++++-- .../apache/spark/sql/internal/SQLConf.scala | 15 ++++++++++ 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 078efabbeeb4e..ad9cb38338ac4 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -58,7 +58,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sparkContext.hadoopConfiguration - val parsedOptions = new AvroOptions(options, conf) + val parsedOptions = new AvroOptions(options, conf, spark.sessionState.conf) // Schema evolution is not supported yet. Here we only pick a single random sample file to // figure out the schema of the whole dataset. @@ -113,16 +113,17 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf()) + val parsedOptions = new AvroOptions( + options, + spark.sessionState.newHadoopConf(), + spark.sessionState.conf) val outputAvroSchema = SchemaConverters.toAvroType( dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) AvroJob.setOutputKeySchema(job, outputAvroSchema) - val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec" - val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level" val COMPRESS_KEY = "mapred.output.compress" - spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match { + parsedOptions.compression match { case "uncompressed" => log.info("writing uncompressed Avro records") job.getConfiguration.setBoolean(COMPRESS_KEY, false) @@ -133,8 +134,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) case "deflate" => - val deflateLevel = spark.conf.get( - AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt + val deflateLevel = parsedOptions.compressionLevel log.info(s"compressing Avro output using deflate (level=$deflateLevel)") job.getConfiguration.setBoolean(COMPRESS_KEY, true) job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) @@ -158,7 +158,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { val broadcastedConf = spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf)) - val parsedOptions = new AvroOptions(options, hadoopConf) + val parsedOptions = new AvroOptions(options, hadoopConf, spark.sessionState.conf) (file: PartitionedFile) => { val log = LoggerFactory.getLogger(classOf[AvroFileFormat]) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index cd9a911a14bfa..894a9b044f686 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -21,16 +21,18 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.internal.SQLConf /** * Options for Avro Reader and Writer stored in case insensitive manner. */ class AvroOptions( @transient val parameters: CaseInsensitiveMap[String], - @transient val conf: Configuration) extends Logging with Serializable { + @transient val conf: Configuration, + @transient val sqlConf: SQLConf) extends Logging with Serializable { - def this(parameters: Map[String, String], conf: Configuration) = { - this(CaseInsensitiveMap(parameters), conf) + def this(parameters: Map[String, String], conf: Configuration, sqlConf: SQLConf) = { + this(CaseInsensitiveMap(parameters), conf, sqlConf) } /** @@ -68,4 +70,24 @@ class AvroOptions( .map(_.toBoolean) .getOrElse(!ignoreFilesWithoutExtension) } + + /** + * The `compression` option allows to specify a compression codec used in write. + * Currently supported codecs are `uncompressed`, `snappy` and `deflate`. + * If the option is not set, the `snappy` compression is used by default. + */ + val compression: String = parameters.get("compression").getOrElse(sqlConf.avroCompressionCodec) + + + /** + * Level of compression in the range of 1..9 inclusive. 1 - for fast, 9 - for best compression. + * If the compression level is not set for `deflate` compression, the current value of SQL + * config `spark.sql.avro.deflate.level` is used by default. For other compressions, the default + * value is `6`. + */ + val compressionLevel: Int = { + parameters.get("compressionLevel").map(_.toInt).getOrElse { + if (compression.toLowerCase == "deflate") sqlConf.avroDeflateLevel else 6 + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fbb9a8cfae2e1..f5932e018dd67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1422,6 +1422,17 @@ object SQLConf { "This only takes effect when spark.sql.repl.eagerEval.enabled is set to true.") .intConf .createWithDefault(20) + + val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec") + .doc("Compression codec used in writing of AVRO files.") + .stringConf + .createWithDefault("snappy") + + val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level") + .doc("Compression level for the deflate codec used in writing of AVRO files. " + + "Valid value must be in the range of from 1 to 9 inclusive. Default value is 6.") + .intConf + .createWithDefault(6) } /** @@ -1806,6 +1817,10 @@ class SQLConf extends Serializable with Logging { def replEagerEvalTruncate: Int = getConf(SQLConf.REPL_EAGER_EVAL_TRUNCATE) + def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC) + + def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ From 0caa0f09e5b4a9f2f1a383f29820904783a65f02 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 21 Jul 2018 23:56:29 +0200 Subject: [PATCH 02/13] Adding a test for new options --- .../org/apache/spark/sql/avro/AvroSuite.scala | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index dad56aacf9326..cd33a40078773 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -355,7 +355,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("write with compression") { + test("write with compression - sql configs") { withTempPath { dir => val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec" val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level" @@ -889,4 +889,30 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(count == 8) } } + + test("write with compression - avro options") { + withTempPath { dir => + val uncompressDir = s"$dir/uncompress" + val deflateDir = s"$dir/deflate" + val snappyDir = s"$dir/snappy" + + val df = spark.read.avro(testAvro) + df.write + .option("compression", "uncompressed") + .avro(uncompressDir) + df.write + .options(Map("compression" -> "deflate", "compressionLevel" -> "9")) + .avro(deflateDir) + df.write + .option("compression", "snappy") + .avro(snappyDir) + + val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir)) + val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir)) + val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir)) + + assert(uncompressSize > deflateSize) + assert(snappySize > deflateSize) + } + } } From c5802dffd6b99832ce5abc826334f9b788608054 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 22 Jul 2018 10:47:30 +0200 Subject: [PATCH 03/13] Ticket number is added to test title. --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index cd33a40078773..56e57a40ec10c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -890,7 +890,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("write with compression - avro options") { + test("SPARK-24881: write with compression - avro options") { withTempPath { dir => val uncompressDir = s"$dir/uncompress" val deflateDir = s"$dir/deflate" From f8b580ba33736a19fb14a6d7fa9fc929b4cf20ba Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 22 Jul 2018 11:15:45 +0200 Subject: [PATCH 04/13] Removing unneeded toLowerCase --- .../src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 894a9b044f686..9cc871dfcd8d9 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -86,8 +86,9 @@ class AvroOptions( * value is `6`. */ val compressionLevel: Int = { + val defaultLevel = 6 parameters.get("compressionLevel").map(_.toInt).getOrElse { - if (compression.toLowerCase == "deflate") sqlConf.avroDeflateLevel else 6 + if (compression == "deflate") sqlConf.avroDeflateLevel else defaultLevel } } } From 8c1746c6a5a5a8a85fcdb12cdff7d2471c002af9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 23 Jul 2018 22:30:20 +0200 Subject: [PATCH 05/13] Replace avro by format("avro").save/load --- .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 60226fb9b04d6..a041c708b3b0b 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -903,16 +903,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val deflateDir = s"$dir/deflate" val snappyDir = s"$dir/snappy" - val df = spark.read.avro(testAvro) + val df = spark.read.format("avro").load(testAvro) df.write .option("compression", "uncompressed") - .avro(uncompressDir) + .format("avro") + .save(uncompressDir) df.write .options(Map("compression" -> "deflate", "compressionLevel" -> "9")) - .avro(deflateDir) + .format("avro") + .save(deflateDir) df.write .option("compression", "snappy") - .avro(snappyDir) + .format("avro") + .save(snappyDir) val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir)) val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir)) From d21d3e998b33b7e8c020031af69ddfdb32522477 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 23 Jul 2018 22:37:08 +0200 Subject: [PATCH 06/13] Set default value to -1 for the deflate codec. --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f52ccd9881e21..3a3c7319b5bfc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.util.{Locale, NoSuchElementException, Properties, TimeZone} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference +import java.util.zip.Deflater import scala.collection.JavaConverters._ import scala.collection.immutable @@ -1438,9 +1439,10 @@ object SQLConf { val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level") .doc("Compression level for the deflate codec used in writing of AVRO files. " + - "Valid value must be in the range of from 1 to 9 inclusive. Default value is 6.") + "Valid value must be in the range of from 1 to 9 inclusive. " + + "The default value is -1 which corresponds to 6 level in the current implementation.") .intConf - .createWithDefault(6) + .createWithDefault(Deflater.DEFAULT_COMPRESSION) } /** From 41f1936be635be0308352004a7335ebf15ffdd1b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 26 Jul 2018 23:31:20 +0200 Subject: [PATCH 07/13] Reading codec name from meta info in the test --- .../org/apache/spark/sql/avro/AvroSuite.scala | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 81508751dd44e..0987b2e0bba28 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -27,8 +27,8 @@ import scala.collection.JavaConverters._ import org.apache.avro.Schema import org.apache.avro.Schema.{Field, Type} -import org.apache.avro.file.DataFileWriter -import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} +import org.apache.avro.file.{DataFileReader, DataFileWriter} +import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord} import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils @@ -906,31 +906,29 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("SPARK-24881: write with compression - avro options") { + def getCodec(dir: String): Option[String] = { + val jsonFiles = new File(dir) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("avro")) + jsonFiles.map { file => + val reader = new DataFileReader(file, new GenericDatumReader[Any]()) + val r = reader.getMetaString("avro.codec") + r + }.map(v => if (v == "null") "uncompress" else v).headOption + } + def checkCodec(df: DataFrame, dir: String, codec: String): Unit = { + val subdir = s"$dir/$codec" + df.write.option("compression", codec).format("avro").save(subdir) + assert(getCodec(subdir) == Some(codec)) + } withTempPath { dir => - val uncompressDir = s"$dir/uncompress" - val deflateDir = s"$dir/deflate" - val snappyDir = s"$dir/snappy" - + val path = dir.toString val df = spark.read.format("avro").load(testAvro) - df.write - .option("compression", "uncompressed") - .format("avro") - .save(uncompressDir) - df.write - .options(Map("compression" -> "deflate", "compressionLevel" -> "9")) - .format("avro") - .save(deflateDir) - df.write - .option("compression", "snappy") - .format("avro") - .save(snappyDir) - val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir)) - val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir)) - val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir)) - - assert(uncompressSize > deflateSize) - assert(snappySize > deflateSize) + checkCodec(df, path, "uncompress") + checkCodec(df, path, "deflate") + checkCodec(df, path, "snappy") } } } From 952bdb0c129138f6aa5689c314cdf8dbeb089cc3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 26 Jul 2018 23:35:49 +0200 Subject: [PATCH 08/13] Removing the compressionLevel option --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 8 +++----- .../org/apache/spark/sql/avro/AvroOptions.scala | 14 -------------- 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 370d0fedb8817..1d67b5373e5dd 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -112,10 +112,8 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val parsedOptions = new AvroOptions( - options, - spark.sessionState.newHadoopConf(), - spark.sessionState.conf) + val sqlConf = spark.sessionState.conf + val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf(), sqlConf) val outputAvroSchema = SchemaConverters.toAvroType( dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) @@ -133,7 +131,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) case "deflate" => - val deflateLevel = parsedOptions.compressionLevel + val deflateLevel = sqlConf.avroDeflateLevel log.info(s"compressing Avro output using deflate (level=$deflateLevel)") job.getConfiguration.setBoolean(COMPRESS_KEY, true) job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 9cc871dfcd8d9..abd476f158d07 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -77,18 +77,4 @@ class AvroOptions( * If the option is not set, the `snappy` compression is used by default. */ val compression: String = parameters.get("compression").getOrElse(sqlConf.avroCompressionCodec) - - - /** - * Level of compression in the range of 1..9 inclusive. 1 - for fast, 9 - for best compression. - * If the compression level is not set for `deflate` compression, the current value of SQL - * config `spark.sql.avro.deflate.level` is used by default. For other compressions, the default - * value is `6`. - */ - val compressionLevel: Int = { - val defaultLevel = 6 - parameters.get("compressionLevel").map(_.toInt).getOrElse { - if (compression == "deflate") sqlConf.avroDeflateLevel else defaultLevel - } - } } From b315f372dbadd1bbc8da806d38d13ab9549511be Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 26 Jul 2018 23:44:10 +0200 Subject: [PATCH 09/13] Check config values --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 4 ++-- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 0987b2e0bba28..948f340772abe 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -915,7 +915,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val reader = new DataFileReader(file, new GenericDatumReader[Any]()) val r = reader.getMetaString("avro.codec") r - }.map(v => if (v == "null") "uncompress" else v).headOption + }.map(v => if (v == "null") "uncompressed" else v).headOption } def checkCodec(df: DataFrame, dir: String, codec: String): Unit = { val subdir = s"$dir/$codec" @@ -926,7 +926,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val path = dir.toString val df = spark.read.format("avro").load(testAvro) - checkCodec(df, path, "uncompress") + checkCodec(df, path, "uncompressed") checkCodec(df, path, "deflate") checkCodec(df, path, "snappy") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c019938e65276..e004e7ca74080 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1439,13 +1439,15 @@ object SQLConf { val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec") .doc("Compression codec used in writing of AVRO files.") .stringConf + .checkValues(Set("uncompressed", "deflate", "snappy")) .createWithDefault("snappy") val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level") .doc("Compression level for the deflate codec used in writing of AVRO files. " + - "Valid value must be in the range of from 1 to 9 inclusive. " + + "Valid value must be in the range of from 1 to 9 inclusive or -1. " + "The default value is -1 which corresponds to 6 level in the current implementation.") .intConf + .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) } From 6915f349904fe38f67009a21b31eae775801e21b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 26 Jul 2018 23:53:55 +0200 Subject: [PATCH 10/13] Fix val name --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 948f340772abe..3d0b1f2f0e2d8 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -907,11 +907,11 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("SPARK-24881: write with compression - avro options") { def getCodec(dir: String): Option[String] = { - val jsonFiles = new File(dir) + val files = new File(dir) .listFiles() .filter(_.isFile) .filter(_.getName.endsWith("avro")) - jsonFiles.map { file => + files.map { file => val reader = new DataFileReader(file, new GenericDatumReader[Any]()) val r = reader.getMetaString("avro.codec") r From 5561582cae32b57a877669e60b123dcb41dcc939 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 27 Jul 2018 00:38:28 +0200 Subject: [PATCH 11/13] Document default value for codec --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e004e7ca74080..a269e218c4efd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1437,7 +1437,7 @@ object SQLConf { .createWithDefault(20) val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec") - .doc("Compression codec used in writing of AVRO files.") + .doc("Compression codec used in writing of AVRO files. Default codec is snappy.") .stringConf .checkValues(Set("uncompressed", "deflate", "snappy")) .createWithDefault("snappy") From ebaf327d17ffda55a35490e080cde5b2948cc655 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 27 Jul 2018 00:43:46 +0200 Subject: [PATCH 12/13] Updating comment for Avro option --- .../src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index abd476f158d07..6be067456044e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -74,7 +74,8 @@ class AvroOptions( /** * The `compression` option allows to specify a compression codec used in write. * Currently supported codecs are `uncompressed`, `snappy` and `deflate`. - * If the option is not set, the `snappy` compression is used by default. + * If the option is not set, the `spark.sql.avro.compression.codec` config is taken into + * account. If the former one is not set too, the `snappy` codec is used by default. */ val compression: String = parameters.get("compression").getOrElse(sqlConf.avroCompressionCodec) } From 5f83902e2876745f8be245681e7cb41d69421778 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 27 Jul 2018 08:33:07 +0200 Subject: [PATCH 13/13] Addressing Liang-Chi Hsieh's review comments --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 9 ++++----- .../scala/org/apache/spark/sql/avro/AvroOptions.scala | 11 ++++++----- .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 11 +++++------ 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 1d67b5373e5dd..1df1c8b4af2e9 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -57,7 +57,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sessionState.newHadoopConf() - val parsedOptions = new AvroOptions(options, conf, spark.sessionState.conf) + val parsedOptions = new AvroOptions(options, conf) // Schema evolution is not supported yet. Here we only pick a single random sample file to // figure out the schema of the whole dataset. @@ -112,8 +112,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val sqlConf = spark.sessionState.conf - val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf(), sqlConf) + val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf()) val outputAvroSchema = SchemaConverters.toAvroType( dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) @@ -131,7 +130,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) case "deflate" => - val deflateLevel = sqlConf.avroDeflateLevel + val deflateLevel = spark.sessionState.conf.avroDeflateLevel log.info(s"compressing Avro output using deflate (level=$deflateLevel)") job.getConfiguration.setBoolean(COMPRESS_KEY, true) job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) @@ -155,7 +154,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { val broadcastedConf = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions = new AvroOptions(options, hadoopConf, spark.sessionState.conf) + val parsedOptions = new AvroOptions(options, hadoopConf) (file: PartitionedFile) => { val log = LoggerFactory.getLogger(classOf[AvroFileFormat]) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 6be067456044e..0f59007e7f72c 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -28,11 +28,10 @@ import org.apache.spark.sql.internal.SQLConf */ class AvroOptions( @transient val parameters: CaseInsensitiveMap[String], - @transient val conf: Configuration, - @transient val sqlConf: SQLConf) extends Logging with Serializable { + @transient val conf: Configuration) extends Logging with Serializable { - def this(parameters: Map[String, String], conf: Configuration, sqlConf: SQLConf) = { - this(CaseInsensitiveMap(parameters), conf, sqlConf) + def this(parameters: Map[String, String], conf: Configuration) = { + this(CaseInsensitiveMap(parameters), conf) } /** @@ -77,5 +76,7 @@ class AvroOptions( * If the option is not set, the `spark.sql.avro.compression.codec` config is taken into * account. If the former one is not set too, the `snappy` codec is used by default. */ - val compression: String = parameters.get("compression").getOrElse(sqlConf.avroCompressionCodec) + val compression: String = { + parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec) + } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 3d0b1f2f0e2d8..cbf33ea968072 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -34,6 +34,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ @@ -366,19 +367,17 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("write with compression - sql configs") { withTempPath { dir => - val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec" - val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level" val uncompressDir = s"$dir/uncompress" val deflateDir = s"$dir/deflate" val snappyDir = s"$dir/snappy" val df = spark.read.format("avro").load(testAvro) - spark.conf.set(AVRO_COMPRESSION_CODEC, "uncompressed") + spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "uncompressed") df.write.format("avro").save(uncompressDir) - spark.conf.set(AVRO_COMPRESSION_CODEC, "deflate") - spark.conf.set(AVRO_DEFLATE_LEVEL, "9") + spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "deflate") + spark.conf.set(SQLConf.AVRO_DEFLATE_LEVEL.key, "9") df.write.format("avro").save(deflateDir) - spark.conf.set(AVRO_COMPRESSION_CODEC, "snappy") + spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "snappy") df.write.format("avro").save(snappyDir) val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))