From 16caad61adfa62f4594ae0e6cab603c59bc7d177 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 18 Jan 2020 13:32:10 +0300 Subject: [PATCH] Avoid rebuilding Avro Options per each partition --- .../spark/sql/v2/avro/AvroPartitionReaderFactory.scala | 5 ++--- .../main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 0397d15aed924..8230dbaf8ea6c 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.SerializableConfiguration * @param dataSchema Schema of AVRO files. * @param readDataSchema Required data schema of AVRO files. * @param partitionSchema Schema of partitions. - * @param options Options for parsing AVRO files. + * @param parsedOptions Options for parsing AVRO files. */ case class AvroPartitionReaderFactory( sqlConf: SQLConf, @@ -54,11 +54,10 @@ case class AvroPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - options: Map[String, String]) extends FilePartitionReaderFactory with Logging { + parsedOptions: AvroOptions) extends FilePartitionReaderFactory with Logging { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - val parsedOptions = new AvroOptions(options, conf) val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse) if (parsedOptions.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala index bb840e69d99a3..d5a29124a276e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.AvroOptions import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex @@ -45,10 +46,11 @@ case class AvroScan( val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) + val parsedOptions = new AvroOptions(caseSensitiveMap, hadoopConf) // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. AvroPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, caseSensitiveMap) + dataSchema, readDataSchema, readPartitionSchema, parsedOptions) } override def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan =