From 57755d3427341323681eccab710005969fde7771 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Apr 2022 13:36:16 -0700 Subject: [PATCH 01/24] Removed unnessary Catalyst projection --- .../apache/hudi/HoodieMergeOnReadRDD.scala | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index c4c70cb414e32..5232b9c7f5d49 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -161,9 +161,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) protected var recordToLoad: InternalRow = _ - // TODO validate whether we need to do UnsafeProjection - protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema) - // NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals" // w/in the record payload. This is required, to project records read from the Delta Log file // which always reads records in full schema (never projected, due to the fact that DL file might @@ -178,16 +175,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val logRecords = logScanner.getRecords.asScala - // NOTE: This iterator iterates over already projected (in required schema) records // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's // going to be used, since we modify `logRecords` before that and therefore can't do it any earlier protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = logRecords.iterator.map { case (_, record) => - val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) - avroRecordOpt.map { - avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder) - } + toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps)) + .map(_.asInstanceOf[GenericRecord]) } protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] = @@ -205,7 +199,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // Record has been deleted, skipping this.hasNextInternal } else { - recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get)) + val projectedAvroRecord = projectAvroUnsafe(avroRecordOpt.get, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder) + recordToLoad = deserialize(projectedAvroRecord) true } } @@ -235,8 +230,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def hasNext: Boolean = { if (baseFileIterator.hasNext) { - val curRow = baseFileIterator.next() - recordToLoad = curRow + recordToLoad = baseFileIterator.next() true } else { super[LogFileIterator].hasNext @@ -275,15 +269,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // handling records @tailrec private def hasNextInternal: Boolean = { if (baseFileIterator.hasNext) { - val curRowRecord = baseFileIterator.next() - val curKey = curRowRecord.getString(recordKeyOrdinal) + val curRow = baseFileIterator.next() + val curKey = curRow.getString(recordKeyOrdinal) val updatedRecordOpt = removeLogRecord(curKey) if (updatedRecordOpt.isEmpty) { // No merge needed, load current row with required projected schema - recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals)) + recordToLoad = projectRowUnsafe(curRow, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals) true } else { - val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get) + val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping this.hasNextInternal @@ -293,8 +287,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // might already be read in projected one (as an optimization). // As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback // to [[projectAvro]] + // + // NOTE: We're projecting as [[Avro]] here primarily due to the fact that output of the merging + // seq is an Avro record. Otherwise we'd have to deserialize first (in full schema), then + // invoke [[UnsafeProjection]], however this would require us to deserialize the whole object + // into Spark's Catalyst representation first val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder) - recordToLoad = unsafeProjection(deserialize(projectedAvroRecord)) + recordToLoad = deserialize(projectedAvroRecord) true } } From 01a54f1f3dad15e3e75a81fa3e014f74e20ec736 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Apr 2022 13:47:51 -0700 Subject: [PATCH 02/24] Cleaning up naming for Catalyst expression utils --- .../main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala | 2 +- .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 2 +- .../scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala | 2 +- .../hudi/command/procedures/RunClusteringProcedure.scala | 2 +- .../test/scala/org/apache/hudi/TestDataSkippingUtils.scala | 2 +- .../scala/org/apache/spark/sql/adapter/Spark2Adapter.scala | 2 +- .../org/apache/spark/sql/adapter/Spark3_1Adapter.scala | 2 +- .../org/apache/spark/sql/adapter/Spark3_2Adapter.scala | 6 +++--- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index cd01e4fd5a065..cd30528798d66 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -45,7 +45,7 @@ trait SparkAdapter extends Serializable { * Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating * on Catalyst [[Expression]]s */ - def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils + def getCatalystExpressionUtils: HoodieCatalystExpressionUtils /** * Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index ff6515db325ac..88a1c5241e18a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -648,7 +648,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { type BaseFileReader = PartitionedFile => Iterator[InternalRow] private def generateUnsafeProjection(from: StructType, to: StructType) = - sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to) + sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to) def convertToAvroSchema(structSchema: StructType): Schema = sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index b5d19bd37d682..6a04ec57e1127 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -384,7 +384,7 @@ private object ColumnStatsExpressionUtils { * Returns only [[AttributeReference]] contained as a sub-expression */ object AllowedTransformationExpression extends SparkAdapterSupport { - val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() + val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils def unapply(expr: Expression): Option[AttributeReference] = { // First step, we check that expression diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index f2ae31a0f7286..289f800b27f28 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -41,7 +41,7 @@ class RunClusteringProcedure extends BaseProcedure with Logging with SparkAdapterSupport { - private val exprUtils = sparkAdapter.getCatalystExpressionUtils() + private val exprUtils = sparkAdapter.getCatalystExpressionUtils /** * OPTIMIZE table_name|table_path [WHERE predicate] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index bd5aa01216fcb..9300b94bb9cc0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -57,7 +57,7 @@ case class IndexRow(fileName: String, class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSupport { - val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils() + val exprUtils: HoodieCatalystExpressionUtils = sparkAdapter.getCatalystExpressionUtils var spark: SparkSession = _ diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index fdc085780047c..30af252d2dc16 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -40,7 +40,7 @@ import scala.collection.mutable.ArrayBuffer */ class Spark2Adapter extends SparkAdapter { - override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils + override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark2CatalystExpressionUtils override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark2CatalystPlanUtils diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index 8093d7069220e..c710bbc5377fd 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansU */ class Spark3_1Adapter extends BaseSpark3Adapter { - def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils + override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark3_1CatalystExpressionUtils override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index ceb66b7437ed2..fe25ee7fdc6b8 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -30,15 +30,15 @@ import org.apache.spark.sql._ */ class Spark3_2Adapter extends BaseSpark3Adapter { - def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils - override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = new HoodieSpark3_2AvroSerializer(rootCatalystType, rootAvroType, nullable) override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = new HoodieSpark3_2AvroDeserializer(rootAvroType, rootCatalystType) - override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils + override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark32CatalystExpressionUtils + + override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark32CatalystPlanUtils override def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = { Some( From c77a31c5fd12ffb6aba07647156eb138ed992d94 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Apr 2022 13:51:38 -0700 Subject: [PATCH 03/24] Employ Catalyst's `UnsafeProjection` when projecting records w/o updates --- .../org/apache/hudi/HoodieBaseRelation.scala | 4 +-- .../apache/hudi/HoodieMergeOnReadRDD.scala | 32 +++++-------------- 2 files changed, 10 insertions(+), 26 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 88a1c5241e18a..ff3646fe05587 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -45,7 +45,7 @@ import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} @@ -647,7 +647,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { type BaseFileReader = PartitionedFile => Iterator[InternalRow] - private def generateUnsafeProjection(from: StructType, to: StructType) = + private def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to) def convertToAvroSchema(structSchema: StructType): Schema = diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 5232b9c7f5d49..6d9eaeae688cd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.HoodieBaseRelation.generateUnsafeProjection import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} @@ -230,6 +231,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def hasNext: Boolean = { if (baseFileIterator.hasNext) { + // NOTE: For 'skip-merge' querying mode base-file reader is already expected to read in the + // projected schema recordToLoad = baseFileIterator.next() true } else { @@ -255,13 +258,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // As such, no particular schema could be assumed, and therefore we rely on the caller // to correspondingly set the scheme of the expected output of base-file reader private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr) - private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema) private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema, baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema)) private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) + private val requiredSchemaUnsafeProjection = + generateUnsafeProjection(baseFileReaderSchema.structTypeSchema, requiredStructTypeSchema) + override def hasNext: Boolean = hasNextInternal // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure @@ -273,8 +278,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, val curKey = curRow.getString(recordKeyOrdinal) val updatedRecordOpt = removeLogRecord(curKey) if (updatedRecordOpt.isEmpty) { - // No merge needed, load current row with required projected schema - recordToLoad = projectRowUnsafe(curRow, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals) + // No merge is required, simply load current row and project into required schema + recordToLoad = requiredSchemaUnsafeProjection(curRow) true } else { val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) @@ -380,27 +385,6 @@ private object HoodieMergeOnReadRDD { } } - /** - * Projects provided instance of [[InternalRow]] into provided schema, assuming that the - * the schema of the original row is strictly a superset of the given one - */ - private def projectRowUnsafe(row: InternalRow, - projectedSchema: StructType, - ordinals: Seq[Int]): InternalRow = { - val projectedRow = new SpecificInternalRow(projectedSchema) - var curIndex = 0 - projectedSchema.zip(ordinals).foreach { case (field, pos) => - val curField = if (row.isNullAt(pos)) { - null - } else { - row.get(pos, field.dataType) - } - projectedRow.update(curIndex, curField) - curIndex += 1 - } - projectedRow - } - /** * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the * the schema of the original row is strictly a superset of the given one From 502e93e128b06c8a8691837185e9806381902739 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Apr 2022 14:19:03 -0700 Subject: [PATCH 04/24] Consolidated Avro records projections w/in `SafeAvroProjection` --- .../apache/hudi/HoodieMergeOnReadRDD.scala | 99 ++++++++++--------- 1 file changed, 50 insertions(+), 49 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 6d9eaeae688cd..e086b81c45d84 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,10 +23,10 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.BaseFileReader -import org.apache.hudi.HoodieBaseRelation.generateUnsafeProjection +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection} import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} -import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} +import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals +import org.apache.hudi.HoodieMergeOnReadRDD._ import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils @@ -44,8 +44,6 @@ import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.HoodieAvroDeserializer import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} -import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.types.StructType import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} @@ -159,14 +157,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr) - protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) protected var recordToLoad: InternalRow = _ - // NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals" - // w/in the record payload. This is required, to project records read from the Delta Log file - // which always reads records in full schema (never projected, due to the fact that DL file might - // be stored in non-columnar formats like Avro, HFile, etc) - private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema) + private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema) private var logScanner = { val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) @@ -200,7 +193,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // Record has been deleted, skipping this.hasNextInternal } else { - val projectedAvroRecord = projectAvroUnsafe(avroRecordOpt.get, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder) + val projectedAvroRecord = requiredSchemaSafeAvroProjection(avroRecordOpt.get) recordToLoad = deserialize(projectedAvroRecord) true } @@ -262,6 +255,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema, baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema)) + private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) + private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) private val requiredSchemaUnsafeProjection = @@ -297,7 +292,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // seq is an Avro record. Otherwise we'd have to deserialize first (in full schema), then // invoke [[UnsafeProjection]], however this would require us to deserialize the whole object // into Spark's Catalyst representation first - val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder) + val mergedAvroRecord = mergedAvroRecordOpt.get + val projection = SafeAvroProjection.create(mergedAvroRecord.getSchema, requiredAvroSchema, reusableRecordBuilder) + val projectedAvroRecord = projection.apply(mergedAvroRecord.asInstanceOf[GenericRecord]) recordToLoad = deserialize(projectedAvroRecord) true } @@ -385,45 +382,49 @@ private object HoodieMergeOnReadRDD { } } - /** - * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the - * the schema of the original row is strictly a superset of the given one - */ - def projectAvroUnsafe(record: IndexedRecord, - projectedSchema: Schema, - ordinals: List[Int], - recordBuilder: GenericRecordBuilder): GenericRecord = { - val fields = projectedSchema.getFields.asScala - checkState(fields.length == ordinals.length) - fields.zip(ordinals).foreach { - case (field, pos) => recordBuilder.set(field, record.get(pos)) - } - recordBuilder.build() - } + // TODO extract to HoodieAvroSchemaUtils + abstract class AvroProjection extends (GenericRecord => GenericRecord) - /** - * Projects provided instance of [[IndexedRecord]] into provided schema, assuming that the - * the schema of the original row is strictly a superset of the given one - * - * This is a "safe" counterpart of [[projectAvroUnsafe]]: it does build mapping of the record's - * schema into projected one itself (instead of expecting such mapping from the caller) - */ - def projectAvro(record: IndexedRecord, - projectedSchema: Schema, - recordBuilder: GenericRecordBuilder): GenericRecord = { - projectAvroUnsafe(record, projectedSchema, collectFieldOrdinals(projectedSchema, record.getSchema), recordBuilder) + class SafeAvroProjection(sourceSchema: Schema, + projectedSchema: Schema, + reusableRecordBuilder: GenericRecordBuilder = null) extends AvroProjection { + + private val ordinals: List[Int] = collectFieldOrdinals(projectedSchema, sourceSchema) + private val recordBuilder: GenericRecordBuilder = + if (reusableRecordBuilder != null) { + reusableRecordBuilder + } else { + new GenericRecordBuilder(projectedSchema) + } + + override def apply(record: GenericRecord): GenericRecord = { + val fields = projectedSchema.getFields.asScala + checkState(fields.length == ordinals.length) + fields.zip(ordinals).foreach { + case (field, pos) => recordBuilder.set(field, record.get(pos)) + } + recordBuilder.build() + } } - /** - * Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which - * will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method - * - * @param projected target projected schema (which is a proper subset of [[source]] [[Schema]]) - * @param source source schema of the record being projected - * @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one - */ - private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = { - projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList + object SafeAvroProjection { + def create(sourceSchema: Schema, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder = null): SafeAvroProjection = + new SafeAvroProjection( + sourceSchema = sourceSchema, + projectedSchema = projectedSchema, + reusableRecordBuilder = reusableRecordBuilder) + + /** + * Maps [[projected]] [[Schema]] onto [[source]] one, collecting corresponding field ordinals w/in it, which + * will be subsequently used by either [[projectRowUnsafe]] or [[projectAvroUnsafe()]] method + * + * @param projected target projected schema (which is a proper subset of [[source]] [[Schema]]) + * @param source source schema of the record being projected + * @return list of ordinals of corresponding fields of [[projected]] schema w/in [[source]] one + */ + private def collectFieldOrdinals(projected: Schema, source: Schema): List[Int] = { + projected.getFields.asScala.map(f => source.getField(f.name()).pos()).toList + } } private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { From 443ee265fedbe2204b3f57e0a017bd70f7f09937 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Apr 2022 14:38:44 -0700 Subject: [PATCH 05/24] Do unsafe Avro projection after merging to avoid `AvroSafeProjection` init costs --- .../apache/hudi/HoodieMergeOnReadRDD.scala | 51 +++++++++---------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index e086b81c45d84..750134818b23a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -282,19 +282,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // Record has been deleted, skipping this.hasNextInternal } else { - // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c - // record from the Delta Log will bear (full) Table schema, while record from the Base file - // might already be read in projected one (as an optimization). - // As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback - // to [[projectAvro]] - // - // NOTE: We're projecting as [[Avro]] here primarily due to the fact that output of the merging - // seq is an Avro record. Otherwise we'd have to deserialize first (in full schema), then - // invoke [[UnsafeProjection]], however this would require us to deserialize the whole object - // into Spark's Catalyst representation first - val mergedAvroRecord = mergedAvroRecordOpt.get - val projection = SafeAvroProjection.create(mergedAvroRecord.getSchema, requiredAvroSchema, reusableRecordBuilder) - val projectedAvroRecord = projection.apply(mergedAvroRecord.asInstanceOf[GenericRecord]) + val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord], + requiredAvroSchema, reusableRecordBuilder) recordToLoad = deserialize(projectedAvroRecord) true } @@ -382,6 +371,27 @@ private object HoodieMergeOnReadRDD { } } + private def projectAvroUnsafe(record: GenericRecord, projectedSchema: Schema, reusableRecordBuilder: GenericRecordBuilder): GenericRecord = { + val fields = projectedSchema.getFields.asScala + fields.foreach(field => reusableRecordBuilder.set(field, record.get(field.name()))) + reusableRecordBuilder.build() + } + + private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { + // Determine partition path as an immediate parent folder of either + // - The base file + // - Some log file + split.dataFile.map(baseFile => new Path(baseFile.filePath)) + .getOrElse(split.logFiles.head.getPath) + .getParent + } + + private def resolveAvroSchemaNullability(schema: Schema) = { + AvroConversionUtils.resolveAvroTypeNullability(schema) match { + case (nullable, _) => nullable + } + } + // TODO extract to HoodieAvroSchemaUtils abstract class AvroProjection extends (GenericRecord => GenericRecord) @@ -427,21 +437,6 @@ private object HoodieMergeOnReadRDD { } } - private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { - // Determine partition path as an immediate parent folder of either - // - The base file - // - Some log file - split.dataFile.map(baseFile => new Path(baseFile.filePath)) - .getOrElse(split.logFiles.head.getPath) - .getParent - } - - private def resolveAvroSchemaNullability(schema: Schema) = { - AvroConversionUtils.resolveAvroTypeNullability(schema) match { - case (nullable, _) => nullable - } - } - trait AvroDeserializerSupport extends SparkAdapterSupport { protected val requiredAvroSchema: Schema protected val requiredStructTypeSchema: StructType From ca3024aaa29996e542bcd1eef7398f8135918931 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 27 May 2022 16:34:24 -0700 Subject: [PATCH 06/24] Fixed handling of partition schema extraction for both COW/MOR relations by: - Pushing down extraction to the layer composing actual base-file readers - Added `BaseFileReader` abstraction holding both reader and the actual schema produced by the reader --- .../org/apache/hudi/HoodieBaseRelation.scala | 165 +++++++++--------- 1 file changed, 84 insertions(+), 81 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index ff3646fe05587..79f9ed5037637 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -334,38 +334,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt) val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema)) - // Since schema requested by the caller might contain partition columns, we might need to - // prune it, removing all partition columns from it in case these columns are not persisted - // in the data files - // - // NOTE: This partition schema is only relevant to file reader to be able to embed - // values of partition columns (hereafter referred to as partition values) encoded into - // the partition path, and omitted from the data file, back into fetched rows; - // Note that, by default, partition columns are not omitted therefore specifying - // partition schema for reader is not required - val (partitionSchema, dataSchema, requiredDataSchema) = - tryPrunePartitionColumns(tableSchema, requiredSchema) - if (fileSplits.isEmpty) { sparkSession.sparkContext.emptyRDD } else { - val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters) - - // NOTE: In case when partition columns have been pruned from the required schema, we have to project - // the rows from the pruned schema back into the one expected by the caller - val projectedRDD = if (requiredDataSchema.structTypeSchema != requiredSchema.structTypeSchema) { - rdd.mapPartitions { it => - val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) - val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema) - it.map(unsafeProjection) - } - } else { - rdd - } + val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters) // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details - projectedRDD.asInstanceOf[RDD[Row]] + rdd.asInstanceOf[RDD[Row]] } } @@ -373,19 +349,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied * * @param fileSplits file splits to be handled by the RDD - * @param partitionSchema target table's partition schema - * @param dataSchema target table's data files' schema + * @param tableSchema target table's schema * @param requiredSchema projected schema required by the reader * @param requestedColumns columns requested by the query * @param filters data filters to be applied - * @return instance of RDD (implementing [[HoodieUnsafeRDD]]) + * @return instance of RDD (holding [[InternalRow]]s) */ protected def composeRDD(fileSplits: Seq[FileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieUnsafeRDD + filters: Array[Filter]): RDD[InternalRow] /** * Provided with partition and date filters collects target file splits to read records from, while @@ -564,42 +538,57 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // we have to eagerly initialize all of the readers even though only one specific to the type // of the file being read will be used. This is required to avoid serialization of the whole // relation (containing file-index for ex) and passing it to the executor - val reader = tableBaseFileFormat match { - case HoodieFileFormat.PARQUET => - HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = spark, - dataSchema = dataSchema.structTypeSchema, - partitionSchema = partitionSchema, - requiredSchema = requiredSchema.structTypeSchema, - filters = filters, - options = options, - hadoopConf = hadoopConf, - // We're delegating to Spark to append partition values to every row only in cases - // when these corresponding partition-values are not persisted w/in the data file itself - appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath - ) + val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) = + tableBaseFileFormat match { + case HoodieFileFormat.PARQUET => + ( + HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = spark, + dataSchema = dataSchema.structTypeSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredSchema.structTypeSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf, + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath + ), + // Since partition values by default are omitted, and not persisted w/in data-files by Spark, + // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading + // the data. As such, actual full schema produced by such reader is composed of + // a) Prepended partition column values + // b) Data-file schema (projected or not) + StructType(partitionSchema.fields ++ requiredSchema.structTypeSchema.fields) + ) case HoodieFileFormat.HFILE => - createHFileReader( - spark = spark, - dataSchema = dataSchema, - requiredSchema = requiredSchema, - filters = filters, - options = options, - hadoopConf = hadoopConf + ( + createHFileReader( + spark = spark, + dataSchema = dataSchema, + requiredSchema = requiredSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf + ), + requiredSchema ) case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") } - partitionedFile => { - val extension = FSUtils.getFileExtension(partitionedFile.filePath) - if (tableBaseFileFormat.getFileExtension.equals(extension)) { - reader.apply(partitionedFile) - } else { - throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") - } - } + new BaseFileReader( + read = partitionedFile => { + val extension = FSUtils.getFileExtension(partitionedFile.filePath) + if (tableBaseFileFormat.getFileExtension.equals(extension)) { + read(partitionedFile) + } else { + throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") + } + }, + schema = schema + ) } protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = { @@ -615,8 +604,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, conf } - private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { + protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { + // Since schema requested by the caller might contain partition columns, we might need to + // prune it, removing all partition columns from it in case these columns are not persisted + // in the data files + // + // NOTE: This partition schema is only relevant to file reader to be able to embed + // values of partition columns (hereafter referred to as partition values) encoded into + // the partition path, and omitted from the data file, back into fetched rows; + // Note that, by default, partition columns are not omitted therefore specifying + // partition schema for reader is not required if (shouldExtractPartitionValuesFromPartitionPath) { val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType))) val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema) @@ -645,9 +643,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, object HoodieBaseRelation extends SparkAdapterSupport { - type BaseFileReader = PartitionedFile => Iterator[InternalRow] + class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], val schema: StructType) { + def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file) + } - private def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = + def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to) def convertToAvroSchema(structSchema: StructType): Schema = @@ -698,21 +698,24 @@ object HoodieBaseRelation extends SparkAdapterSupport { val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - partitionedFile => { - val hadoopConf = hadoopConfBroadcast.value.get() - val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), - new CacheConfig(hadoopConf)) - - val requiredRowSchema = requiredSchema.structTypeSchema - // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable - // to be passed from driver to executor - val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) - val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) - - reader.getRecordIterator(requiredAvroSchema).asScala - .map(record => { - avroToRowConverter.apply(record).get - }) - } + new BaseFileReader( + read = partitionedFile => { + val hadoopConf = hadoopConfBroadcast.value.get() + val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), + new CacheConfig(hadoopConf)) + + val requiredRowSchema = requiredSchema.structTypeSchema + // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable + // to be passed from driver to executor + val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) + val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) + + reader.getRecordIterator(requiredAvroSchema).asScala + .map(record => { + avroToRowConverter.apply(record).get + }) + }, + schema = requiredSchema.structTypeSchema + ) } } From b0e92090d6b2fbb206ecb3cf5810c20e668035f5 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 27 May 2022 16:35:20 -0700 Subject: [PATCH 07/24] Fixed `BaseFileOnlyRelation` --- .../apache/hudi/BaseFileOnlyRelation.scala | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index b7033c3bfc31c..25a3dd22014c0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,9 +20,11 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.generateUnsafeProjection import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.hadoop.HoodieROTablePathFilter +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources._ @@ -68,17 +70,18 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, } protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieUnsafeRDD = { + filters: Array[Filter]): RDD[InternalRow] = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) val baseFileReader = createBaseFileReader( spark = sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = requiredSchema, + requiredSchema = requiredDataSchema, filters = filters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it @@ -86,7 +89,19 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) ) - new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits) + val rdd = new HoodieFileScanRDD(sparkSession, baseFileReader.apply, fileSplits) + + // NOTE: In case when partition columns have been pruned from the required schema, we have to project + // the rows from the pruned schema back into the one expected by the caller + if (requiredDataSchema.structTypeSchema == requiredSchema.structTypeSchema) { + rdd + } else { + rdd.mapPartitions { it => + val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) + val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema) + it.map(unsafeProjection) + } + } } protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { From 9b5f3cdce748b686a04d235f75fae2546e082026 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 27 May 2022 16:36:10 -0700 Subject: [PATCH 08/24] Fixed `MergeOnReadSnapshotRelation` --- .../apache/hudi/HoodieMergeOnReadRDD.scala | 49 +++++++++++-------- .../hudi/MergeOnReadSnapshotRelation.scala | 13 +++-- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 750134818b23a..23104410fdeca 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -59,10 +59,22 @@ case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader requiredSchemaFileReaderForMerging: BaseFileReader, requiredSchemaFileReaderForNoMerging: BaseFileReader) +/** + * RDD enabling Hudi's Merge-on-Read (MOR) semantic + * + * @param sc spark's context + * @param config hadoop configuration + * @param fileReaders suite of base file readers + * @param tableSchema table's full schema + * @param requiredSchema expected (potentially) projected schema + * @param tableState table's state + * @param mergeType type of merge performed + * @param fileSplits target file-splits this RDD will be iterating over + */ class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, fileReaders: HoodieMergeOnReadBaseFileReaders, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, mergeType: String, @@ -119,7 +131,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, iter } - private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], HoodieTableSchema) = { + private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], StructType) = { // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum // of the stored data possible, while still properly executing corresponding relation's semantic // and meet the query's requirements. @@ -128,10 +140,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // a) It does use one of the standard (and whitelisted) Record Payload classes // then we can avoid reading and parsing the records w/ _full_ schema, and instead only // rely on projected one, nevertheless being able to perform merging correctly - if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) - (fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema) - else - (fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), requiredSchema) + val reader = if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) { + fileReaders.fullSchemaFileReader + } else { + fileReaders.requiredSchemaFileReaderForMerging + } + + (reader(split.dataFile.get), reader.schema) } override protected def getPartitions: Array[Partition] = @@ -155,14 +170,14 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema - protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr) + protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) protected var recordToLoad: InternalRow = _ private val requiredSchemaSafeAvroProjection = SafeAvroProjection.create(logFileReaderAvroSchema, requiredAvroSchema) private var logScanner = { - val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) + val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, maxCompactionMemoryInBytes, config, internalSchema) } @@ -241,7 +256,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, */ private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, baseFileIterator: Iterator[InternalRow], - baseFileReaderSchema: HoodieTableSchema, + baseFileReaderSchema: StructType, config: Configuration) extends LogFileIterator(split, config) { @@ -250,17 +265,15 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // - Projected schema // As such, no particular schema could be assumed, and therefore we rely on the caller // to correspondingly set the scheme of the expected output of base-file reader - private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr) + private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReaderSchema, nullable = false, "record") - private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema, - baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema)) + private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema, baseFileReaderAvroSchema, nullable = false) private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField) + private val recordKeyOrdinal = baseFileReaderSchema.fieldIndex(tableState.recordKeyField) - private val requiredSchemaUnsafeProjection = - generateUnsafeProjection(baseFileReaderSchema.structTypeSchema, requiredStructTypeSchema) + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReaderSchema, requiredStructTypeSchema) override def hasNext: Boolean = hasNextInternal @@ -386,12 +399,6 @@ private object HoodieMergeOnReadRDD { .getParent } - private def resolveAvroSchemaNullability(schema: Schema) = { - AvroConversionUtils.resolveAvroTypeNullability(schema) match { - case (nullable, _) => nullable - } - } - // TODO extract to HoodieAvroSchemaUtils abstract class AvroProjection extends (GenericRecord => GenericRecord) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index c6d4eafafc91d..d4913901030a8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression @@ -80,11 +81,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, } protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieMergeOnReadRDD = { + filters: Array[Filter]): RDD[InternalRow] = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + val fullSchemaBaseFileReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, @@ -102,7 +105,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, ) val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) = - createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters) + createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredDataSchema, requestedColumns, filters) val tableState = getTableState new HoodieMergeOnReadRDD( @@ -113,7 +116,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging, requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging ), - dataSchema = dataSchema, + tableSchema = dataSchema, requiredSchema = requiredSchema, tableState = tableState, mergeType = mergeType, From 6ffb5aa1d95f947e50889b9663f6845e0c9646cf Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 27 May 2022 17:55:55 -0700 Subject: [PATCH 09/24] Abstracted base-file readers creation for MOR relation behind a single method --- .../hudi/MergeOnReadSnapshotRelation.scala | 106 ++++++++++++++---- 1 file changed, 87 insertions(+), 19 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index d4913901030a8..eae6ed9947804 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -88,7 +88,29 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val (partitionSchema, dataSchema, requiredDataSchema) = tryPrunePartitionColumns(tableSchema, requiredSchema) - val fullSchemaBaseFileReader = createBaseFileReader( + val requiredFilters = Seq.empty + val optionalFilters = filters + val readers = createBaseFileReaders(partitionSchema, dataSchema, requiredDataSchema, requestedColumns, requiredFilters, optionalFilters) + + val tableState = getTableState + new HoodieMergeOnReadRDD( + sqlContext.sparkContext, + config = jobConf, + fileReaders = readers, + tableSchema = dataSchema, + requiredSchema = requiredSchema, + tableState = tableState, + mergeType = mergeType, + fileSplits = fileSplits) + } + + protected def createBaseFileReaders(partitionSchema: StructType, + dataSchema: HoodieTableSchema, + requiredDataSchema: HoodieTableSchema, + requestedColumns: Array[String], + requiredFilters: Seq[Filter], + optionalFilters: Seq[Filter] = Seq.empty): HoodieMergeOnReadBaseFileReaders = { + val fullSchemaReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, @@ -96,31 +118,77 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, // This file-reader is used to read base file records, subsequently merging them with the records // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that - // we combine them correctly) - filters = Seq.empty, + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) ) - val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) = - createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredDataSchema, requestedColumns, filters) + val requiredSchemaReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredSchema = requiredDataSchema, + // This file-reader is used to read base file records, subsequently merging them with the records + // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding + // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema, metaClient.getBasePath, validCommits) + ) - val tableState = getTableState - new HoodieMergeOnReadRDD( - sqlContext.sparkContext, - config = jobConf, - fileReaders = HoodieMergeOnReadBaseFileReaders( - fullSchemaFileReader = fullSchemaBaseFileReader, - requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging, - requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging - ), - tableSchema = dataSchema, - requiredSchema = requiredSchema, - tableState = tableState, - mergeType = mergeType, - fileSplits = fileSplits) + // Check whether fields required for merging were also requested to be fetched + // by the query: + // - In case they were, there's no optimization we could apply here (we will have + // to fetch such fields) + // - In case they were not, we will provide 2 separate file-readers + // a) One which would be applied to file-groups w/ delta-logs (merging) + // b) One which would be applied to file-groups w/ no delta-logs or + // in case query-mode is skipping merging + val mandatoryColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName) + if (mandatoryColumns.forall(requestedColumns.contains)) { + HoodieMergeOnReadBaseFileReaders( + fullSchemaFileReader = fullSchemaReader, + requiredSchemaFileReaderForMerging = requiredSchemaReader, + requiredSchemaFileReaderForNoMerging = requiredSchemaReader + ) + } else { + val prunedRequiredSchema = { + val superfluousColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) + val prunedStructSchema = + StructType(requiredDataSchema.structTypeSchema.fields + .filterNot(f => superfluousColumnNames.contains(f.name))) + + HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString) + } + + val requiredSchemaReaderSkipMerging = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredSchema = prunedRequiredSchema, + // This file-reader is only used in cases when no merging is performed, therefore it's safe to push + // down these filters to the base file readers + filters = requiredFilters ++ optionalFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema, metaClient.getBasePath, validCommits) + ) + + HoodieMergeOnReadBaseFileReaders( + fullSchemaFileReader = fullSchemaReader, + requiredSchemaFileReaderForMerging = requiredSchemaReader, + requiredSchemaFileReaderForNoMerging = requiredSchemaReaderSkipMerging + ) + } } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { From 2a63374776467c8078bc642ae66e31c6d8be9784 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 27 May 2022 17:56:55 -0700 Subject: [PATCH 10/24] Rebased `MergeOnReadIncrementalRelation` onto new `createBaseFileReaders` method --- .../hudi/MergeOnReadIncrementalRelation.scala | 45 ++++++------------- 1 file changed, 14 insertions(+), 31 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 6fa130ac8caf8..14e3c731fd8b3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,7 +17,6 @@ package org.apache.hudi -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.model.{FileSlice, HoodieRecord} @@ -27,7 +26,9 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits} +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -58,32 +59,18 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, } protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], - partitionSchema: StructType, - dataSchema: HoodieTableSchema, + tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], - filters: Array[Filter]): HoodieMergeOnReadRDD = { - val fullSchemaParquetReader = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - dataSchema = dataSchema, - requiredSchema = dataSchema, - // This file-reader is used to read base file records, subsequently merging them with the records - // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding - // applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that - // we combine them correctly) - // - // The only filtering applicable here is the filtering to make sure we're only fetching records that - // fall into incremental span of the timeline being queried - filters = incrementalSpanRecordFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) - ) - - val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) = - createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters) + filters: Array[Filter]): RDD[InternalRow] = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + + // The only required filters are ones that make sure we're only fetching records that + // fall into incremental span of the timeline being queried + val requiredFilters = incrementalSpanRecordFilters + val optionalFilters = filters + val readers = createBaseFileReaders(partitionSchema, dataSchema, requiredDataSchema, requestedColumns, requiredFilters, optionalFilters) val hoodieTableState = getTableState // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately @@ -91,12 +78,8 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, - fileReaders = HoodieMergeOnReadBaseFileReaders( - fullSchemaFileReader = fullSchemaParquetReader, - requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging, - requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging - ), - dataSchema = dataSchema, + fileReaders = readers, + tableSchema = dataSchema, requiredSchema = requiredSchema, tableState = hoodieTableState, mergeType = mergeType, From be05a41a3b572db7b2d78e9a23575af834d9d165 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 27 May 2022 17:57:16 -0700 Subject: [PATCH 11/24] Fixing compilation --- .../apache/hudi/BaseFileOnlyRelation.scala | 4 +++ .../org/apache/hudi/HoodieFileScanRDD.scala | 4 +-- .../apache/hudi/HoodieMergeOnReadRDD.scala | 30 ++++++++++++++----- .../hudi/MergeOnReadSnapshotRelation.scala | 22 +++++++------- 4 files changed, 40 insertions(+), 20 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 25a3dd22014c0..3716bbb9f7073 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -21,13 +21,17 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieBaseRelation.generateUnsafeProjection +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala index a176626f76421..4b7a09795a2e1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, P case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, - readFunction: PartitionedFile => Iterator[InternalRow], + read: PartitionedFile => Iterator[InternalRow], @transient fileSplits: Seq[HoodieBaseFileSplit]) - extends FileScanRDD(sparkSession, readFunction, fileSplits.map(_.filePartition)) + extends FileScanRDD(sparkSession, read, fileSplits.map(_.filePartition)) with HoodieUnsafeRDD { override final def collect(): Array[InternalRow] = super[HoodieUnsafeRDD].collect() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 23104410fdeca..dc4e5298cd12e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -55,9 +55,25 @@ import scala.util.Try case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition -case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader, - requiredSchemaFileReaderForMerging: BaseFileReader, - requiredSchemaFileReaderForNoMerging: BaseFileReader) +/** + * Class holding base-file readers for 3 different use-cases: + * + *
    + *
  1. Full-schema reader: is used when whole row has to be read to perform merging correctly. + * This could occur, when no optimizations could be applied and we have to fallback to read the whole row from + * the base file and the corresponding delta-log file to merge them correctly
  2. + * + *
  3. Required-schema reader: is used when it's fine to only read row's projected columns. + * This could occur, when row could be merged with corresponding delta-log record leveraging while only having + * projected columns
  4. + * + *
  5. Required-schema reader (skip-merging): is used when when no merging will be performed (skip-merged). + * This could occur, when file-group has no delta-log files
  6. + *
+ */ +private[hudi] class HoodieMergeOnReadBaseFileReaders(val fullSchemaReader: BaseFileReader, + val requiredSchemaReader: BaseFileReader, + val requiredSchemaReaderSkipMerging: BaseFileReader) /** * RDD enabling Hudi's Merge-on-Read (MOR) semantic @@ -101,13 +117,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get) + fileReaders.requiredSchemaReaderSkipMerging.apply(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => new LogFileIterator(logFileOnlySplit, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => - val baseFileIterator = fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get) + val baseFileIterator = fileReaders.requiredSchemaReaderSkipMerging.apply(split.dataFile.get) new SkipMergeIterator(split, baseFileIterator, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => @@ -141,9 +157,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // then we can avoid reading and parsing the records w/ _full_ schema, and instead only // rely on projected one, nevertheless being able to perform merging correctly val reader = if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) { - fileReaders.fullSchemaFileReader + fileReaders.fullSchemaReader } else { - fileReaders.requiredSchemaFileReaderForMerging + fileReaders.requiredSchemaReader } (reader(split.dataFile.get), reader.schema) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index eae6ed9947804..62ce3a013e822 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -20,7 +20,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema} +import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath import org.apache.hudi.avro.HoodieAvroUtils @@ -141,7 +141,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately - hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema, metaClient.getBasePath, validCommits) + hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema) ) // Check whether fields required for merging were also requested to be fetched @@ -154,10 +154,10 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, // in case query-mode is skipping merging val mandatoryColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName) if (mandatoryColumns.forall(requestedColumns.contains)) { - HoodieMergeOnReadBaseFileReaders( - fullSchemaFileReader = fullSchemaReader, - requiredSchemaFileReaderForMerging = requiredSchemaReader, - requiredSchemaFileReaderForNoMerging = requiredSchemaReader + new HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReader ) } else { val prunedRequiredSchema = { @@ -180,13 +180,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it // to configure Parquet reader appropriately - hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema, metaClient.getBasePath, validCommits) + hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema) ) - HoodieMergeOnReadBaseFileReaders( - fullSchemaFileReader = fullSchemaReader, - requiredSchemaFileReaderForMerging = requiredSchemaReader, - requiredSchemaFileReaderForNoMerging = requiredSchemaReaderSkipMerging + new HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging ) } } From eb5f747b542d38b7fa6487278dfcfcc1ff100678 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 15 Jul 2022 18:00:38 -0700 Subject: [PATCH 12/24] Fixing compilation --- .../scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 62ce3a013e822..f86d46fa1b6bd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -20,7 +20,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath import org.apache.hudi.avro.HoodieAvroUtils From f9beb7e6597370f7eaadce5e4cdcc28dd9bccd86 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 18 Jul 2022 22:09:46 -0700 Subject: [PATCH 13/24] Typo --- .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 79f9ed5037637..da00c45b400c9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -274,7 +274,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, def canPruneRelationSchema: Boolean = (fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) && // NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning) - // TODO(HUDI-XXX) internal schema doesn't supported nested schema pruning currently + // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning currently !hasSchemaOnRead override def schema: StructType = { @@ -666,7 +666,6 @@ object HoodieBaseRelation extends SparkAdapterSupport { tableSchema match { case Right(internalSchema) => checkState(!internalSchema.isEmptySchema) - // TODO extend pruning to leverage optimizer pruned schema val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava) val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema") val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) From b29986e9aa65f173697a54b63bc2e04c2f355b9e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Jul 2022 15:19:36 -0700 Subject: [PATCH 14/24] Fixing compilation --- .../org/apache/spark/sql/adapter/Spark3_1Adapter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index c710bbc5377fd..028bb5788cc29 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -19,20 +19,20 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils} import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark31CatalystExpressionUtils, HoodieSpark31CatalystPlanUtils} /** * Implementation of [[SparkAdapter]] for Spark 3.1.x */ class Spark3_1Adapter extends BaseSpark3Adapter { - override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark3_1CatalystExpressionUtils + override def getCatalystExpressionUtils: HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils - override def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils = HoodieSpark31CatalystExpressionUtils + override def getCatalystPlanUtils: HoodieCatalystPlansUtils = HoodieSpark31CatalystPlanUtils override def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = new HoodieSpark3_1AvroSerializer(rootCatalystType, rootAvroType, nullable) From 807927bb773642badef75fdb00b81d2d6d661469 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Jul 2022 15:32:13 -0700 Subject: [PATCH 15/24] Converted `BaseFileReader` hierarchy to be case-classes (for serializability) --- .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 4 ++-- .../main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | 6 +++--- .../scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index da00c45b400c9..460081383aa12 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -643,7 +643,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, object HoodieBaseRelation extends SparkAdapterSupport { - class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], val schema: StructType) { + case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], schema: StructType) { def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file) } @@ -697,7 +697,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - new BaseFileReader( + BaseFileReader( read = partitionedFile => { val hadoopConf = hadoopConfBroadcast.value.get() val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index dc4e5298cd12e..477ec0fa4f546 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -71,9 +71,9 @@ case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSp * This could occur, when file-group has no delta-log files * */ -private[hudi] class HoodieMergeOnReadBaseFileReaders(val fullSchemaReader: BaseFileReader, - val requiredSchemaReader: BaseFileReader, - val requiredSchemaReaderSkipMerging: BaseFileReader) +private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: BaseFileReader, + requiredSchemaReader: BaseFileReader, + requiredSchemaReaderSkipMerging: BaseFileReader) /** * RDD enabling Hudi's Merge-on-Read (MOR) semantic diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index f86d46fa1b6bd..d71e89265fa7d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -154,7 +154,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, // in case query-mode is skipping merging val mandatoryColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName) if (mandatoryColumns.forall(requestedColumns.contains)) { - new HoodieMergeOnReadBaseFileReaders( + HoodieMergeOnReadBaseFileReaders( fullSchemaReader = fullSchemaReader, requiredSchemaReader = requiredSchemaReader, requiredSchemaReaderSkipMerging = requiredSchemaReader @@ -183,7 +183,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema) ) - new HoodieMergeOnReadBaseFileReaders( + HoodieMergeOnReadBaseFileReaders( fullSchemaReader = fullSchemaReader, requiredSchemaReader = requiredSchemaReader, requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging From b450eecdbc7a54c3e1eb08d1dc964375e59478a8 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 19:12:40 -0700 Subject: [PATCH 16/24] Fixed base-file reader creation seq to properly project into the required schema --- .../apache/hudi/BaseFileOnlyRelation.scala | 19 +------ .../org/apache/hudi/HoodieBaseRelation.scala | 55 ++++++++++++------- 2 files changed, 36 insertions(+), 38 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 3716bbb9f7073..009ddd8ac8572 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,18 +20,13 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.generateUnsafeProjection -import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -93,19 +88,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) ) - val rdd = new HoodieFileScanRDD(sparkSession, baseFileReader.apply, fileSplits) - - // NOTE: In case when partition columns have been pruned from the required schema, we have to project - // the rows from the pruned schema back into the one expected by the caller - if (requiredDataSchema.structTypeSchema == requiredSchema.structTypeSchema) { - rdd - } else { - rdd.mapPartitions { it => - val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) - val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema) - it.map(unsafeProjection) - } - } + new HoodieFileScanRDD(sparkSession, baseFileReader.apply, fileSplits) } protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 460081383aa12..ae6689e436a8c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -541,26 +541,41 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) = tableBaseFileFormat match { case HoodieFileFormat.PARQUET => - ( - HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = spark, - dataSchema = dataSchema.structTypeSchema, - partitionSchema = partitionSchema, - requiredSchema = requiredSchema.structTypeSchema, - filters = filters, - options = options, - hadoopConf = hadoopConf, - // We're delegating to Spark to append partition values to every row only in cases - // when these corresponding partition-values are not persisted w/in the data file itself - appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath - ), - // Since partition values by default are omitted, and not persisted w/in data-files by Spark, - // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading - // the data. As such, actual full schema produced by such reader is composed of - // a) Prepended partition column values - // b) Data-file schema (projected or not) - StructType(partitionSchema.fields ++ requiredSchema.structTypeSchema.fields) + val rawParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = spark, + dataSchema = dataSchema.structTypeSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredSchema.structTypeSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf, + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath ) + // Since partition values by default are omitted, and not persisted w/in data-files by Spark, + // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading + // the data. As such, actual full schema produced by such reader is composed of + // a) Data-file schema (projected or not) + // b) Appended partition column values + val readerSchema = StructType(requiredSchema.structTypeSchema.fields ++ partitionSchema.fields) + + // NOTE: In case when file reader's schema doesn't match the schema expected by the caller (for ex, if it contains + // partition columns which might not be persisted w/in the data file, and therefore would be pruned from the required + // schema and appended into the resulting one), we have to project the rows from the base file-reader schema + // back into the one expected by the caller + val projectedReader = if (readerSchema == requiredSchema.structTypeSchema) { + rawParquetReader + } else { + file: PartitionedFile => { + // NOTE: Projection is not a serializable object, hence it creation should only happen w/in + // the executor process + val unsafeProjection = generateUnsafeProjection(readerSchema, requiredSchema.structTypeSchema) + rawParquetReader.apply(file).map(unsafeProjection) + } + } + + (projectedReader, readerSchema) case HoodieFileFormat.HFILE => ( @@ -578,7 +593,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") } - new BaseFileReader( + BaseFileReader( read = partitionedFile => { val extension = FSUtils.getFileExtension(partitionedFile.filePath) if (tableBaseFileFormat.getFileExtension.equals(extension)) { From 5b6281e6b80cbad016d3799c79d490ad206aaad4 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 19:36:04 -0700 Subject: [PATCH 17/24] Delineate clearly data-file schema --- .../apache/hudi/BaseFileOnlyRelation.scala | 2 +- .../org/apache/hudi/HoodieBaseRelation.scala | 24 +++++++++---------- .../hudi/MergeOnReadSnapshotRelation.scala | 10 ++++---- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 009ddd8ac8572..ec87c87ea9564 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -80,7 +80,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, spark = sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = requiredDataSchema, + requiredDataSchema = requiredDataSchema, filters = filters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index ae6689e436a8c..a6c51750acfb0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -527,7 +527,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def createBaseFileReader(spark: SparkSession, partitionSchema: StructType, dataSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredDataSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): BaseFileReader = { @@ -545,7 +545,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, sparkSession = spark, dataSchema = dataSchema.structTypeSchema, partitionSchema = partitionSchema, - requiredSchema = requiredSchema.structTypeSchema, + requiredSchema = requiredDataSchema.structTypeSchema, filters = filters, options = options, hadoopConf = hadoopConf, @@ -558,36 +558,36 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // the data. As such, actual full schema produced by such reader is composed of // a) Data-file schema (projected or not) // b) Appended partition column values - val readerSchema = StructType(requiredSchema.structTypeSchema.fields ++ partitionSchema.fields) + val fileReaderSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) // NOTE: In case when file reader's schema doesn't match the schema expected by the caller (for ex, if it contains // partition columns which might not be persisted w/in the data file, and therefore would be pruned from the required // schema and appended into the resulting one), we have to project the rows from the base file-reader schema // back into the one expected by the caller - val projectedReader = if (readerSchema == requiredSchema.structTypeSchema) { + val projectedReader = if (fileReaderSchema == requiredDataSchema.structTypeSchema) { rawParquetReader } else { file: PartitionedFile => { // NOTE: Projection is not a serializable object, hence it creation should only happen w/in // the executor process - val unsafeProjection = generateUnsafeProjection(readerSchema, requiredSchema.structTypeSchema) + val unsafeProjection = generateUnsafeProjection(fileReaderSchema, requiredDataSchema.structTypeSchema) rawParquetReader.apply(file).map(unsafeProjection) } } - (projectedReader, readerSchema) + (projectedReader, fileReaderSchema) case HoodieFileFormat.HFILE => ( createHFileReader( spark = spark, dataSchema = dataSchema, - requiredSchema = requiredSchema, + requiredDataSchema = requiredDataSchema, filters = filters, options = options, hadoopConf = hadoopConf ), - requiredSchema + requiredDataSchema ) case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") @@ -705,7 +705,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { private def createHFileReader(spark: SparkSession, dataSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, + requiredDataSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): BaseFileReader = { @@ -718,10 +718,10 @@ object HoodieBaseRelation extends SparkAdapterSupport { val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), new CacheConfig(hadoopConf)) - val requiredRowSchema = requiredSchema.structTypeSchema + val requiredRowSchema = requiredDataSchema.structTypeSchema // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable // to be passed from driver to executor - val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) + val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr) val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) reader.getRecordIterator(requiredAvroSchema).asScala @@ -729,7 +729,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { avroToRowConverter.apply(record).get }) }, - schema = requiredSchema.structTypeSchema + schema = requiredDataSchema.structTypeSchema ) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index d71e89265fa7d..31ee5f51741fe 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -114,7 +114,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = dataSchema, + requiredDataSchema = dataSchema, // This file-reader is used to read base file records, subsequently merging them with the records // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that @@ -131,7 +131,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = requiredDataSchema, + requiredDataSchema = requiredDataSchema, // This file-reader is used to read base file records, subsequently merging them with the records // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that @@ -173,7 +173,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = prunedRequiredSchema, + requiredDataSchema = prunedRequiredSchema, // This file-reader is only used in cases when no merging is performed, therefore it's safe to push // down these filters to the base file readers filters = requiredFilters ++ optionalFilters, @@ -227,7 +227,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = requiredDataSchema, + requiredDataSchema = requiredDataSchema, filters = filters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it @@ -260,7 +260,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, spark = sqlContext.sparkSession, partitionSchema = partitionSchema, dataSchema = dataSchema, - requiredSchema = prunedRequiredSchema, + requiredDataSchema = prunedRequiredSchema, filters = filters, options = optParams, // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it From 1f3cd586f3ee941feef3094e86d5405e88436132 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 19:50:34 -0700 Subject: [PATCH 18/24] Fixed `SkipMergeIterator` to properly project into required schema --- .../scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 477ec0fa4f546..5e54495c5fc80 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -123,8 +123,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, new LogFileIterator(logFileOnlySplit, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => - val baseFileIterator = fileReaders.requiredSchemaReaderSkipMerging.apply(split.dataFile.get) - new SkipMergeIterator(split, baseFileIterator, getConfig) + val BaseFileReader(read, schema) = fileReaders.requiredSchemaReaderSkipMerging + new SkipMergeIterator(split, read(split.dataFile.get), schema, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => val (baseFileIterator, schema) = readBaseFile(split) @@ -250,14 +250,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, */ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, baseFileIterator: Iterator[InternalRow], + baseFileReaderSchema: StructType, config: Configuration) extends LogFileIterator(split, config) { + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReaderSchema, requiredStructTypeSchema) + override def hasNext: Boolean = { if (baseFileIterator.hasNext) { - // NOTE: For 'skip-merge' querying mode base-file reader is already expected to read in the - // projected schema - recordToLoad = baseFileIterator.next() + // No merge is required, simply load current row and project into required schema + recordToLoad = requiredSchemaUnsafeProjection(baseFileIterator.next()) true } else { super[LogFileIterator].hasNext From d66c47cdecae96f451a136de6d715c14c724e2f7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 20:12:55 -0700 Subject: [PATCH 19/24] Fixed MOR relations to delineate schemas w/in `createBaseFileReaders` --- .../hudi/MergeOnReadIncrementalRelation.scala | 5 +---- .../apache/hudi/MergeOnReadSnapshotRelation.scala | 15 +++++++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 14e3c731fd8b3..6c8f40e45eda2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -63,14 +63,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { - val (partitionSchema, dataSchema, requiredDataSchema) = - tryPrunePartitionColumns(tableSchema, requiredSchema) - // The only required filters are ones that make sure we're only fetching records that // fall into incremental span of the timeline being queried val requiredFilters = incrementalSpanRecordFilters val optionalFilters = filters - val readers = createBaseFileReaders(partitionSchema, dataSchema, requiredDataSchema, requestedColumns, requiredFilters, optionalFilters) + val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) val hoodieTableState = getTableState // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 31ee5f51741fe..1cb35aa61dc4d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -85,31 +85,30 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { - val (partitionSchema, dataSchema, requiredDataSchema) = - tryPrunePartitionColumns(tableSchema, requiredSchema) - val requiredFilters = Seq.empty val optionalFilters = filters - val readers = createBaseFileReaders(partitionSchema, dataSchema, requiredDataSchema, requestedColumns, requiredFilters, optionalFilters) + val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) val tableState = getTableState new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, fileReaders = readers, - tableSchema = dataSchema, + tableSchema = tableSchema, requiredSchema = requiredSchema, tableState = tableState, mergeType = mergeType, fileSplits = fileSplits) } - protected def createBaseFileReaders(partitionSchema: StructType, - dataSchema: HoodieTableSchema, - requiredDataSchema: HoodieTableSchema, + protected def createBaseFileReaders(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, requestedColumns: Array[String], requiredFilters: Seq[Filter], optionalFilters: Seq[Filter] = Seq.empty): HoodieMergeOnReadBaseFileReaders = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + val fullSchemaReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, From 8f0c173e74fb9ac0f7958b0148b1ffd5caf716b6 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 20:15:22 -0700 Subject: [PATCH 20/24] Revisited projecting into required-schema to only occur w/in the RDD themselves --- .../apache/hudi/BaseFileOnlyRelation.scala | 11 ++++- .../org/apache/hudi/HoodieBaseRelation.scala | 47 +++++++++++-------- .../apache/hudi/HoodieMergeOnReadRDD.scala | 5 +- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index ec87c87ea9564..416e91800f71a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,6 +20,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.projectReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.rdd.RDD @@ -88,7 +89,15 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) ) - new HoodieFileScanRDD(sparkSession, baseFileReader.apply, fileSplits) + // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller. + // This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the + // data file, but instead would be parsed from the partition path. In that case output of the file-reader will have + // different ordering of the fields than the original required schema (for more details please check out + // [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema + // back into the one expected by the caller + val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema) + + new HoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits) } protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index a6c51750acfb0..772a00501f8a1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -541,7 +541,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) = tableBaseFileFormat match { case HoodieFileFormat.PARQUET => - val rawParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( + val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = spark, dataSchema = dataSchema.structTypeSchema, partitionSchema = partitionSchema, @@ -558,24 +558,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // the data. As such, actual full schema produced by such reader is composed of // a) Data-file schema (projected or not) // b) Appended partition column values - val fileReaderSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) - - // NOTE: In case when file reader's schema doesn't match the schema expected by the caller (for ex, if it contains - // partition columns which might not be persisted w/in the data file, and therefore would be pruned from the required - // schema and appended into the resulting one), we have to project the rows from the base file-reader schema - // back into the one expected by the caller - val projectedReader = if (fileReaderSchema == requiredDataSchema.structTypeSchema) { - rawParquetReader - } else { - file: PartitionedFile => { - // NOTE: Projection is not a serializable object, hence it creation should only happen w/in - // the executor process - val unsafeProjection = generateUnsafeProjection(fileReaderSchema, requiredDataSchema.structTypeSchema) - rawParquetReader.apply(file).map(unsafeProjection) - } - } + val readerSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields) - (projectedReader, fileReaderSchema) + (parquetReader, readerSchema) case HoodieFileFormat.HFILE => ( @@ -658,7 +643,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, object HoodieBaseRelation extends SparkAdapterSupport { - case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], schema: StructType) { + case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], val schema: StructType) { def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file) } @@ -671,6 +656,30 @@ object HoodieBaseRelation extends SparkAdapterSupport { def getPartitionPath(fileStatus: FileStatus): Path = fileStatus.getPath.getParent + /** + * TODO scala-doc + * + * @param reader + * @param readerSchema + * @param requiredSchema + * @return + */ + def projectReader(reader: BaseFileReader, requiredSchema: StructType): BaseFileReader = { + if (reader.schema == requiredSchema) { + reader + } else { + val read = reader.apply(_) + val projectedRead: PartitionedFile => Iterator[InternalRow] = (file: PartitionedFile) => { + // NOTE: Projection is not a serializable object, hence it creation should only happen w/in + // the executor process + val unsafeProjection = generateUnsafeProjection(reader.schema, requiredSchema) + read(file).map(unsafeProjection) + } + + BaseFileReader(projectedRead, requiredSchema) + } + } + /** * Projects provided schema by picking only required (projected) top-level columns from it * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 5e54495c5fc80..cef52fa2e6214 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,7 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection} +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, generateUnsafeProjection, projectReader} import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} import org.apache.hudi.HoodieMergeOnReadRDD.SafeAvroProjection.collectFieldOrdinals import org.apache.hudi.HoodieMergeOnReadRDD._ @@ -117,7 +117,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - fileReaders.requiredSchemaReaderSkipMerging.apply(dataFileOnlySplit.dataFile.get) + val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) + projectedReader(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => new LogFileIterator(logFileOnlySplit, getConfig) From d9be73d423956a5695964e904e659a5c430f59b5 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 20:19:42 -0700 Subject: [PATCH 21/24] Tidying up --- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 12 +++++++----- .../apache/hudi/MergeOnReadIncrementalRelation.scala | 2 +- pom.xml | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 772a00501f8a1..38e474daf0748 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -657,14 +657,16 @@ object HoodieBaseRelation extends SparkAdapterSupport { fileStatus.getPath.getParent /** - * TODO scala-doc + * Projects provided file reader's output from its original schema, into a [[requiredSchema]] * - * @param reader - * @param readerSchema - * @param requiredSchema - * @return + * NOTE: [[requiredSchema]] has to be a proper subset of the file reader's schema + * + * @param reader file reader to be projected + * @param requiredSchema target schema for the output of the provided file reader */ def projectReader(reader: BaseFileReader, requiredSchema: StructType): BaseFileReader = { + checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size == requiredSchema.size) + if (reader.schema == requiredSchema) { reader } else { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 6c8f40e45eda2..0fc6ef2f83aec 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -76,7 +76,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, sqlContext.sparkContext, config = jobConf, fileReaders = readers, - tableSchema = dataSchema, + tableSchema = tableSchema, requiredSchema = requiredSchema, tableState = hoodieTableState, mergeType = mergeType, diff --git a/pom.xml b/pom.xml index 36fbfb4505d89..fd4f4fe50bfa3 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,7 @@ 2.11.12 2.12.10 ${scala11.version} - 2.11 + 2.12 0.13 3.3.1 3.0.1 From 381d9af03cb6456295392d9d923f736bae8764e3 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 20:29:53 -0700 Subject: [PATCH 22/24] Tidying up --- .../apache/hudi/HoodieMergeOnReadRDD.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index cef52fa2e6214..512c97806f31d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -124,12 +124,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, new LogFileIterator(logFileOnlySplit, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => - val BaseFileReader(read, schema) = fileReaders.requiredSchemaReaderSkipMerging - new SkipMergeIterator(split, read(split.dataFile.get), schema, getConfig) + val reader = fileReaders.requiredSchemaReaderSkipMerging + new SkipMergeIterator(split, reader, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => - val (baseFileIterator, schema) = readBaseFile(split) - new RecordMergingFileIterator(split, baseFileIterator, schema, getConfig) + val reader = pickBaseFileReader + new RecordMergingFileIterator(split, reader, getConfig) case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + @@ -148,7 +148,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, iter } - private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], StructType) = { + private def pickBaseFileReader: BaseFileReader = { // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum // of the stored data possible, while still properly executing corresponding relation's semantic // and meet the query's requirements. @@ -157,13 +157,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // a) It does use one of the standard (and whitelisted) Record Payload classes // then we can avoid reading and parsing the records w/ _full_ schema, and instead only // rely on projected one, nevertheless being able to perform merging correctly - val reader = if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) { - fileReaders.fullSchemaReader - } else { + if (whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) { fileReaders.requiredSchemaReader + } else { + fileReaders.fullSchemaReader } - - (reader(split.dataFile.get), reader.schema) } override protected def getPartitions: Array[Partition] = @@ -250,12 +248,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially) */ private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], - baseFileReaderSchema: StructType, + baseFileReader: BaseFileReader, config: Configuration) extends LogFileIterator(split, config) { - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReaderSchema, requiredStructTypeSchema) + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) + + private val baseFileIterator = baseFileReader(split.dataFile.get) override def hasNext: Boolean = { if (baseFileIterator.hasNext) { @@ -274,8 +273,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, * streams */ private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], - baseFileReaderSchema: StructType, + baseFileReader: BaseFileReader, config: Configuration) extends LogFileIterator(split, config) { @@ -284,15 +282,17 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // - Projected schema // As such, no particular schema could be assumed, and therefore we rely on the caller // to correspondingly set the scheme of the expected output of base-file reader - private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReaderSchema, nullable = false, "record") + private val baseFileReaderAvroSchema = sparkAdapter.getAvroSchemaConverters.toAvroType(baseFileReader.schema, nullable = false, "record") - private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema, baseFileReaderAvroSchema, nullable = false) + private val serializer = sparkAdapter.createAvroSerializer(baseFileReader.schema, baseFileReaderAvroSchema, nullable = false) private val reusableRecordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val recordKeyOrdinal = baseFileReaderSchema.fieldIndex(tableState.recordKeyField) + private val recordKeyOrdinal = baseFileReader.schema.fieldIndex(tableState.recordKeyField) + + private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReader.schema, requiredStructTypeSchema) - private val requiredSchemaUnsafeProjection = generateUnsafeProjection(baseFileReaderSchema, requiredStructTypeSchema) + private val baseFileIterator = baseFileReader(split.dataFile.get) override def hasNext: Boolean = hasNextInternal From 4865e96e81e485a57b8c455012459381b37b1909 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Jul 2022 20:32:55 -0700 Subject: [PATCH 23/24] Reverting accidental change --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fd4f4fe50bfa3..36fbfb4505d89 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,7 @@ 2.11.12 2.12.10 ${scala11.version} - 2.12 + 2.11 0.13 3.3.1 3.0.1 From e121b002945d5aad30b00893dbb00a706be2ebba Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 22 Jul 2022 16:07:52 -0700 Subject: [PATCH 24/24] Fixing HFile reader creation to match other file-formats; Tidying up --- .../org/apache/hudi/HoodieBaseRelation.scala | 56 +++++++++---------- .../apache/hudi/HoodieDataSourceHelper.scala | 18 +++--- .../spark/sql/hudi/TestInsertTable.scala | 3 +- 3 files changed, 37 insertions(+), 40 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 38e474daf0748..5274f257e15b2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -563,18 +563,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, (parquetReader, readerSchema) case HoodieFileFormat.HFILE => - ( - createHFileReader( - spark = spark, - dataSchema = dataSchema, - requiredDataSchema = requiredDataSchema, - filters = filters, - options = options, - hadoopConf = hadoopConf - ), - requiredDataSchema + val hfileReader = createHFileReader( + spark = spark, + dataSchema = dataSchema, + requiredDataSchema = requiredDataSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf ) + (hfileReader, requiredDataSchema.structTypeSchema) + case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") } @@ -719,28 +718,25 @@ object HoodieBaseRelation extends SparkAdapterSupport { requiredDataSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], - hadoopConf: Configuration): BaseFileReader = { + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - BaseFileReader( - read = partitionedFile => { - val hadoopConf = hadoopConfBroadcast.value.get() - val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), - new CacheConfig(hadoopConf)) - - val requiredRowSchema = requiredDataSchema.structTypeSchema - // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable - // to be passed from driver to executor - val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr) - val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) - - reader.getRecordIterator(requiredAvroSchema).asScala - .map(record => { - avroToRowConverter.apply(record).get - }) - }, - schema = requiredDataSchema.structTypeSchema - ) + partitionedFile => { + val hadoopConf = hadoopConfBroadcast.value.get() + val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), + new CacheConfig(hadoopConf)) + + val requiredRowSchema = requiredDataSchema.structTypeSchema + // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable + // to be passed from driver to executor + val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr) + val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema) + + reader.getRecordIterator(requiredAvroSchema).asScala + .map(record => { + avroToRowConverter.apply(record).get + }) + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 6c721723c50a3..8bd295c7f3db4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -41,15 +41,17 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { /** * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]], * when Parquet's Vectorized Reader is used + * + * TODO move to HoodieBaseRelation, make private */ - def buildHoodieParquetReader(sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration, - appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { + private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration, + appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( sparkSession = sparkSession, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 57c826af92ca6..9aa3c509c3dab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.hudi -import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.internal.SQLConf import java.io.File