From 867d94ab4d89d925eeea275990c294d98f1a8653 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 26 Jul 2019 23:54:23 +0800 Subject: [PATCH 1/6] code cleanup for columnar execution framework --- .../apache/spark/sql/execution/Columnar.scala | 120 ++++++++---------- .../sql/execution/DataSourceScanExec.scala | 60 ++++++--- .../sql/execution/WholeStageCodegenExec.scala | 53 ++------ .../columnar/InMemoryTableScanExec.scala | 13 +- .../v2/DataSourceV2ScanExecBase.scala | 7 +- .../org/apache/spark/sql/SubquerySuite.scala | 4 +- .../LogicalPlanTagInSparkPlanSuite.scala | 23 ++-- .../execution/WholeStageCodegenSuite.scala | 44 ++++++- .../columnar/InMemoryColumnarQuerySuite.scala | 2 +- .../python/BatchEvalPythonExecSuite.scala | 8 +- 10 files changed, 181 insertions(+), 153 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 4385843d90112..3954fa6ee4b0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -57,8 +57,8 @@ class ColumnarRule { * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. */ -case class ColumnarToRowExec(child: SparkPlan) - extends UnaryExecNode with CodegenSupport { +case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { + assert(child.supportsColumnar) override def output: Seq[Attribute] = child.output @@ -66,31 +66,27 @@ case class ColumnarToRowExec(child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering + // `ColumnarToRowExec` is the beginning of a codegen stage, so it doesn't need to copy result and + // it can add limit condition check. + override def needCopyResult: Boolean = false + protected override def canCheckLimitNotReached: Boolean = true + override lazy val metrics: Map[String, SQLMetric] = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time") + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches") ) override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") val numInputBatches = longMetric("numInputBatches") - val scanTime = longMetric("scanTime") - // UnsafeProjection is not serializable so do it on the executor side, which is why it is lazy - @transient lazy val outputProject = UnsafeProjection.create(output, output) - val batches = child.executeColumnar() - batches.flatMap(batch => { - val batchStartNs = System.nanoTime() - numInputBatches += 1 - // In order to match the numOutputRows metric in the generated code we update - // numOutputRows for each batch. This is less accurate than doing it at output - // because it will over count the number of rows output in the case of a limit, - // but it is more efficient. - numOutputRows += batch.numRows() - val ret = batch.rowIterator().asScala - scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000)) - ret.map(outputProject) - }) + // This avoids calling `output` in the RDD closure, so that we don't need to include the entire + // plan (this) in the closure. + val localOutput = this.output + child.executeColumnar().mapPartitionsInternal { batches => + val outputProject = UnsafeProjection.create(localOutput, localOutput) + batches.flatMap { batch => + numInputBatches += 1 + batch.rowIterator().asScala.map(outputProject) + } + } } /** @@ -134,11 +130,7 @@ case class ColumnarToRowExec(child: SparkPlan) v => s"$v = inputs[0];") // metrics - val numOutputRows = metricTerm(ctx, "numOutputRows") val numInputBatches = metricTerm(ctx, "numInputBatches") - val scanTimeMetric = metricTerm(ctx, "scanTime") - val scanTimeTotalNs = - ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0 val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.addMutableState(columnarBatchClz, "batch") @@ -156,15 +148,12 @@ case class ColumnarToRowExec(child: SparkPlan) val nextBatchFuncName = ctx.addNewFunction(nextBatch, s""" |private void $nextBatch() throws java.io.IOException { - | long getBatchStart = System.nanoTime(); | if ($input.hasNext()) { | $batch = ($columnarBatchClz)$input.next(); - | $numOutputRows.add($batch.numRows()); + | ${numInputBatches}.add(1); | $idx = 0; | ${columnAssigns.mkString("", "\n", "\n")} - | ${numInputBatches}.add(1); | } - | $scanTimeTotalNs += System.nanoTime() - getBatchStart; |}""".stripMargin) ctx.currentVars = null @@ -184,7 +173,7 @@ case class ColumnarToRowExec(child: SparkPlan) |if ($batch == null) { | $nextBatchFuncName(); |} - |while ($batch != null) { + |while ($limitNotReachedCond $batch != null) { | int $numRows = $batch.numRows(); | int $localEnd = $numRows - $idx; | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { @@ -196,13 +185,11 @@ case class ColumnarToRowExec(child: SparkPlan) | $batch = null; | $nextBatchFuncName(); |} - |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); - |$scanTimeTotalNs = 0; """.stripMargin } override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].inputRDDs() + Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure } } @@ -439,47 +426,46 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode { // Instead of creating a new config we are reusing columnBatchSize. In the future if we do // combine with some of the Arrow conversion tools we will need to unify some of the configs. val numRows = conf.columnBatchSize - val converters = new RowToColumnConverter(schema) - val rowBased = child.execute() - rowBased.mapPartitions(rowIterator => { - new Iterator[ColumnarBatch] { - var cb: ColumnarBatch = null - - TaskContext.get().addTaskCompletionListener[Unit] { _ => - if (cb != null) { - cb.close() - cb = null + // This avoids calling `output` in the RDD closure, so that we don't need to include the entire + // plan (this) in the closure. + val localSchema = this.schema + child.execute().mapPartitionsInternal { rowIterator => + if (rowIterator.hasNext) { + new Iterator[ColumnarBatch] { + private val converters = new RowToColumnConverter(localSchema) + private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { + OffHeapColumnVector.allocateColumns(numRows, localSchema) + } else { + OnHeapColumnVector.allocateColumns(numRows, localSchema) } - } + private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - override def hasNext: Boolean = { - rowIterator.hasNext - } - - override def next(): ColumnarBatch = { - if (cb != null) { + TaskContext.get().addTaskCompletionListener[Unit] { _ => cb.close() - cb = null } - val columnVectors : Array[WritableColumnVector] = - if (enableOffHeapColumnVector) { - OffHeapColumnVector.allocateColumns(numRows, schema).toArray - } else { - OnHeapColumnVector.allocateColumns(numRows, schema).toArray + + override def hasNext: Boolean = { + rowIterator.hasNext + } + + override def next(): ColumnarBatch = { + cb.setNumRows(0) + var rowCount = 0 + while (rowCount < numRows && rowIterator.hasNext) { + val row = rowIterator.next() + converters.convert(row, vectors.toArray) + rowCount += 1 } - var rowCount = 0 - while (rowCount < numRows && rowIterator.hasNext) { - val row = rowIterator.next() - converters.convert(row, columnVectors) - rowCount += 1 + cb.setNumRows(rowCount) + numInputRows += rowCount + numOutputBatches += 1 + cb } - cb = new ColumnarBatch(columnVectors.toArray, rowCount) - numInputRows += rowCount - numOutputBatches += 1 - cb } + } else { + Iterator.empty } - }) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 728ac3a466fbf..9c28568839ce6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -334,37 +334,63 @@ case class FileSourceScanExec( inputRDD :: Nil } - override lazy val metrics = - Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), - "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), + "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + + private abstract class ScanTimeTrackingIterator[T]( + fileScanIterator: Iterator[T], + scanTimeMetrics: SQLMetric) extends Iterator[T] { + + override def hasNext: Boolean = { + // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. + val startNs = System.nanoTime() + val re = fileScanIterator.hasNext + scanTimeMetrics += ((System.nanoTime() - startNs) / (1000 * 1000)) + re + } + } protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - + val scanTime = longMetric("scanTime") if (needsUnsafeRowConversion) { inputRDD.mapPartitionsWithIndexInternal { (index, iter) => val proj = UnsafeProjection.create(schema) proj.initialize(index) - iter.map( r => { - numOutputRows += 1 - proj(r) - }) + + new ScanTimeTrackingIterator[InternalRow](iter, scanTime) { + override def next(): InternalRow = { + numOutputRows += 1 + proj(iter.next()) + } + } } } else { - inputRDD.map { r => - numOutputRows += 1 - r + inputRDD.mapPartitionsInternal { iter => + new ScanTimeTrackingIterator[InternalRow](iter, scanTime) { + override def next(): InternalRow = { + numOutputRows += 1 + iter.next() + } + } } } } protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") - inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch => - numOutputRows += batch.numRows() - batch + val scanTime = longMetric("scanTime") + inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => + new ScanTimeTrackingIterator[ColumnarBatch](batches, scanTime) { + override def next(): ColumnarBatch = { + val batch = batches.next() + numOutputRows += batch.numRows() + batch + } + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index d9d9b1f9016ea..33ac9eb25b388 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution -import java.io.Writer import java.util.Locale import java.util.concurrent.atomic.AtomicInteger @@ -491,12 +490,8 @@ trait InputRDDCodegen extends CodegenSupport { * * This is the leaf node of a tree with WholeStageCodegen that is used to generate code * that consumes an RDD iterator of InternalRow. - * - * @param isChildColumnar true if the inputRDD is really columnar data hidden by type erasure, - * false if inputRDD is really an RDD[InternalRow] */ -case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean) - extends UnaryExecNode with InputRDDCodegen { +case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen { override def output: Seq[Attribute] = child.output @@ -522,13 +517,7 @@ case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean) child.executeColumnar() } - override def inputRDD: RDD[InternalRow] = { - if (isChildColumnar) { - child.executeColumnar().asInstanceOf[RDD[InternalRow]] // Hack because of type erasure - } else { - child.execute() - } - } + override def inputRDD: RDD[InternalRow] = child.execute() // This is a leaf node so the node can produce limit not reached checks. override protected def canCheckLimitNotReached: Boolean = true @@ -870,59 +859,45 @@ case class CollapseCodegenStages( /** * Inserts an InputAdapter on top of those that do not support codegen. */ - private def insertInputAdapter(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = { - val isColumnar = adjustColumnar(plan, isColumnarInput) + private def insertInputAdapter(plan: SparkPlan): SparkPlan = { plan match { case p if !supportCodegen(p) => // collapse them recursively - InputAdapter(insertWholeStageCodegen(p, isColumnar), isColumnar) + InputAdapter(insertWholeStageCodegen(p)) case j: SortMergeJoinExec => // The children of SortMergeJoin should do codegen separately. j.withNewChildren(j.children.map( - child => InputAdapter(insertWholeStageCodegen(child, isColumnar), isColumnar))) - case p => - p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar))) + child => InputAdapter(insertWholeStageCodegen(child)))) + // `ColumnarToRowExec` is kind of a leaf node to whole-stage-codegen. Its generated code can + // process data from the input RDD directly. + case c: ColumnarToRowExec => c + case p => p.withNewChildren(p.children.map(insertInputAdapter)) } } /** * Inserts a WholeStageCodegen on top of those that support codegen. */ - private def insertWholeStageCodegen(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = { - val isColumnar = adjustColumnar(plan, isColumnarInput) + private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = { plan match { // For operators that will output domain object, do not insert WholeStageCodegen for it as // domain object can not be written into unsafe row. case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => - plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, isColumnar))) + plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) case plan: LocalTableScanExec => // Do not make LogicalTableScanExec the root of WholeStageCodegen // to support the fast driver-local collect/take paths. plan case plan: CodegenSupport if supportCodegen(plan) => - WholeStageCodegenExec( - insertInputAdapter(plan, isColumnar))(codegenStageCounter.incrementAndGet()) + WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) case other => - other.withNewChildren(other.children.map(insertWholeStageCodegen(_, isColumnar))) + other.withNewChildren(other.children.map(insertWholeStageCodegen)) } } - /** - * Depending on the stage in the plan and if we currently are columnar or not - * return if we are still columnar or not. - */ - private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean = - // We are walking up the plan, so columnar starts when we transition to rows - // and ends when we transition to columns - plan match { - case c2r: ColumnarToRowExec => true - case r2c: RowToColumnarExec => false - case _ => isColumnar - } - def apply(plan: SparkPlan): SparkPlan = { if (conf.wholeStageEnabled) { - insertWholeStageCodegen(plan, false) + insertWholeStageCodegen(plan) } else { plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 3566ab1aa5a33..e197cd85f7bf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -115,22 +115,19 @@ case class InMemoryTableScanExec( val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled buffers .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled)) - .map(b => { - numOutputRows += b.numRows() - b - }) + .map { buffer => + numOutputRows += buffer.numRows() + buffer + } } private lazy val inputRDD: RDD[InternalRow] = { - val buffers = filteredCachedBatches() - val offHeapColumnVectorEnabled = conf.offHeapColumnVectorEnabled - val numOutputRows = longMetric("numOutputRows") - if (enableAccumulatorsForTest) { readPartitions.setValue(0) readBatches.setValue(0) } + val numOutputRows = longMetric("numOutputRows") // Using these variables here to avoid serialization of entire objects (if referenced // directly) within the map Partitions closure. val relOutput: AttributeSeq = relation.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index c5c902ffc4104..74fc5432ea82c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -79,10 +79,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") - inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { - b => - numOutputRows += b.numRows() - b + inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b => + numOutputRows += b.numRows() + b } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 38a51474cbcd8..9f48ade4b9acb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1293,8 +1293,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row(0, 0), Row(2, 0))) // need to execute the query before we can examine fs.inputRDDs() assert(df.queryExecution.executedPlan match { - case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( - fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _), _))) => + case WholeStageCodegenExec(ColumnarToRowExec( + fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _))) => partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala index b1143484a85e8..9bef43f1f1125 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala @@ -44,14 +44,11 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { } // A scan plan tree is a plan tree that has a leaf node under zero or more Project/Filter nodes. - // Because of how codegen and columnar to row transitions work, we may have InputAdaptors - // and ColumnarToRow transformations in the middle of it, but they will not have the tag - // we want, so skip them if they are the first thing we see - private def isScanPlanTree(plan: SparkPlan, first: Boolean): Boolean = plan match { - case i: InputAdapter if !first => isScanPlanTree(i.child, false) - case c: ColumnarToRowExec if !first => isScanPlanTree(c.child, false) - case p: ProjectExec => isScanPlanTree(p.child, false) - case f: FilterExec => isScanPlanTree(f.child, false) + // We may add `ColumnarToRowExec` above the scan node after planning, we should also skip it. + private def isScanPlanTree(plan: SparkPlan): Boolean = plan match { + case c: ColumnarToRowExec => isScanPlanTree(c.child) + case p: ProjectExec => isScanPlanTree(p.child) + case f: FilterExec => isScanPlanTree(f.child) case _: LeafExecNode => true case _ => false } @@ -92,7 +89,13 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { case _: SubqueryExec | _: ReusedSubqueryExec => assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty) - case _ if isScanPlanTree(plan, true) => + case _ if isScanPlanTree(plan) => + // `ColumnarToRowExec` is added outside of the planner, which doesn't have the logical plan + // tag. + val actualPlan = plan match { + case c: ColumnarToRowExec => c.child + case _ => plan + } // The strategies for planning scan can remove or add FilterExec/ProjectExec nodes, // so it's not simple to check. Instead, we only check that the origin LogicalPlan // contains the corresponding leaf node of the SparkPlan. @@ -100,7 +103,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { // logical = Project(Filter(Scan A)) // physical = ProjectExec(ScanExec A) // we only check that leaf modes match between logical and physical plan. - val logicalLeaves = getLogicalPlan(plan).collectLeaves() + val logicalLeaves = getLogicalPlan(actualPlan).collectLeaves() val physicalLeaves = plan.collectLeaves() assert(logicalLeaves.length == 1) assert(physicalLeaves.length == 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 59b9e155049b5..6b42cbe1dd88a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.expressions.scalalang.typed @@ -120,6 +120,29 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } + test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") { + import testImplicits._ + + val dsInt = spark.range(3).cache() + dsInt.count() + val dsIntFilter = dsInt.filter(_ > 0) + val planInt = dsIntFilter.queryExecution.executedPlan + assert(planInt.collect { + case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(_: InMemoryTableScanExec))) => () + }.length == 1) + assert(dsIntFilter.collect() === Array(1, 2)) + + // cache for string type is not supported for InMemoryTableScanExec + val dsString = spark.range(3).map(_.toString).cache() + dsString.count() + val dsStringFilter = dsString.filter(_ == "1") + val planString = dsStringFilter.queryExecution.executedPlan + assert(planString.collect { + case _: ColumnarToRowExec => () + }.isEmpty) + assert(dsStringFilter.collect() === Array("1")) + } + test("SPARK-19512 codegen for comparing structs is incorrect") { // this would raise CompileException before the fix spark.range(10) @@ -196,6 +219,25 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } + ignore("bytecode of batch file scan exceeds the limit of WHOLESTAGE_HUGE_METHOD_LIMIT") { + import testImplicits._ + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + + withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202", + SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "2000") { + // wide table batch scan causes the byte code of codegen exceeds the limit of + // WHOLESTAGE_HUGE_METHOD_LIMIT + val df2 = spark.read.parquet(path) + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(fileScan2.asInstanceOf[FileSourceScanExec].supportsColumnar) + checkAnswer(df2, df) + } + } + } + test("Control splitting consume function by operators with config") { import testImplicits._ val df = spark.range(10).select(Seq.tabulate(2) {i => ('id + i).as(s"c$i")} : _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 711ecf1738ab1..74aada8d64fa7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -487,7 +487,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val planBeforeFilter = df2.queryExecution.executedPlan.collect { case FilterExec(_, c: ColumnarToRowExec) => c.child - case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: InputAdapter))) => i.child + case WholeStageCodegenExec(FilterExec(_, c: ColumnarToRowExec)) => c.child } assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala index 8a18a1ab5406f..289cc667a1c66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala @@ -50,7 +50,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { case f @ FilterExec( And(_: AttributeReference, _: AttributeReference), - InputAdapter(_: BatchEvalPythonExec, _)) => f + InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(FilterExec(_: In, _))) => b } assert(qualifiedPlanNodes.size == 2) @@ -60,7 +60,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val df = Seq(("Hello", 4)).toDF("a", "b") .where("dummyPythonUDF(a, dummyPythonUDF(a, b)) and a in (3, 4)") val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { - case f @ FilterExec(_: AttributeReference, InputAdapter(_: BatchEvalPythonExec, _)) => f + case f @ FilterExec(_: AttributeReference, InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(FilterExec(_: In, _))) => b } assert(qualifiedPlanNodes.size == 2) @@ -72,7 +72,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { case f @ FilterExec( And(_: AttributeReference, _: GreaterThan), - InputAdapter(_: BatchEvalPythonExec, _)) => f + InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) => b } assert(qualifiedPlanNodes.size == 2) @@ -85,7 +85,7 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { val qualifiedPlanNodes = df.queryExecution.executedPlan.collect { case f @ FilterExec( And(_: AttributeReference, _: GreaterThan), - InputAdapter(_: BatchEvalPythonExec, _)) => f + InputAdapter(_: BatchEvalPythonExec)) => f case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) => b } assert(qualifiedPlanNodes.size == 2) From 009d760832c436e818953b09a6d754570aec8e8e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 27 Jul 2019 10:31:56 +0800 Subject: [PATCH 2/6] add back numOutputRows metrics --- .../scala/org/apache/spark/sql/execution/Columnar.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 3954fa6ee4b0b..1e0eedf1b789a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -72,10 +72,12 @@ case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with Codege protected override def canCheckLimitNotReached: Boolean = true override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches") ) override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") val numInputBatches = longMetric("numInputBatches") // This avoids calling `output` in the RDD closure, so that we don't need to include the entire // plan (this) in the closure. @@ -84,6 +86,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with Codege val outputProject = UnsafeProjection.create(localOutput, localOutput) batches.flatMap { batch => numInputBatches += 1 + numOutputRows += batch.numRows() batch.rowIterator().asScala.map(outputProject) } } @@ -130,6 +133,7 @@ case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with Codege v => s"$v = inputs[0];") // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") val numInputBatches = metricTerm(ctx, "numInputBatches") val columnarBatchClz = classOf[ColumnarBatch].getName @@ -150,7 +154,8 @@ case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with Codege |private void $nextBatch() throws java.io.IOException { | if ($input.hasNext()) { | $batch = ($columnarBatchClz)$input.next(); - | ${numInputBatches}.add(1); + | $numInputBatches.add(1); + | $numOutputRows.add($batch.numRows()); | $idx = 0; | ${columnAssigns.mkString("", "\n", "\n")} | } From ec2a2b841e96fa630804eb1eda2dd15dca0aac72 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 29 Jul 2019 12:07:34 +0800 Subject: [PATCH 3/6] address comments --- .../scala/org/apache/spark/sql/execution/Columnar.scala | 7 +++---- .../spark/sql/execution/WholeStageCodegenExec.scala | 9 ++++++--- .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 4 ++-- .../spark/sql/execution/WholeStageCodegenSuite.scala | 3 ++- .../execution/columnar/InMemoryColumnarQuerySuite.scala | 2 +- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 1e0eedf1b789a..d82b343b3742f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -66,9 +66,8 @@ case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with Codege override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // `ColumnarToRowExec` is the beginning of a codegen stage, so it doesn't need to copy result and - // it can add limit condition check. - override def needCopyResult: Boolean = false + // `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the + // codegen stage and needs to do the limit check. protected override def canCheckLimitNotReached: Boolean = true override lazy val metrics: Map[String, SQLMetric] = Map( @@ -431,7 +430,7 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode { // Instead of creating a new config we are reusing columnBatchSize. In the future if we do // combine with some of the Arrow conversion tools we will need to unify some of the configs. val numRows = conf.columnBatchSize - // This avoids calling `output` in the RDD closure, so that we don't need to include the entire + // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire // plan (this) in the closure. val localSchema = this.schema child.execute().mapPartitionsInternal { rowIterator => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 33ac9eb25b388..73ed2a8a54350 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -517,6 +517,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod child.executeColumnar() } + // `InputAdapter` can only generate code to process the rows from its child. If the child produces + // columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by + // overriding `inputRDD`. override def inputRDD: RDD[InternalRow] = child.execute() // This is a leaf node so the node can produce limit not reached checks. @@ -868,9 +871,6 @@ case class CollapseCodegenStages( // The children of SortMergeJoin should do codegen separately. j.withNewChildren(j.children.map( child => InputAdapter(insertWholeStageCodegen(child)))) - // `ColumnarToRowExec` is kind of a leaf node to whole-stage-codegen. Its generated code can - // process data from the input RDD directly. - case c: ColumnarToRowExec => c case p => p.withNewChildren(p.children.map(insertInputAdapter)) } } @@ -889,6 +889,9 @@ case class CollapseCodegenStages( // to support the fast driver-local collect/take paths. plan case plan: CodegenSupport if supportCodegen(plan) => + // The whole-stage-codegen framework is row-based. If a plan supports columnar execution, + // it can't support whole-stage-codegen at the same time. + assert(!plan.supportsColumnar) WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) case other => other.withNewChildren(other.children.map(insertWholeStageCodegen)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 9f48ade4b9acb..b46abdb48e738 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1293,8 +1293,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row(0, 0), Row(2, 0))) // need to execute the query before we can examine fs.inputRDDs() assert(df.queryExecution.executedPlan match { - case WholeStageCodegenExec(ColumnarToRowExec( - fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _))) => + case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( + fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)))) => partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 6b42cbe1dd88a..55dff16887cb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -128,7 +128,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { val dsIntFilter = dsInt.filter(_ > 0) val planInt = dsIntFilter.queryExecution.executedPlan assert(planInt.collect { - case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(_: InMemoryTableScanExec))) => () + case WholeStageCodegenExec(FilterExec(_, + ColumnarToRowExec(InputAdapter(_: InMemoryTableScanExec)))) => () }.length == 1) assert(dsIntFilter.collect() === Array(1, 2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 74aada8d64fa7..711ecf1738ab1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -487,7 +487,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val planBeforeFilter = df2.queryExecution.executedPlan.collect { case FilterExec(_, c: ColumnarToRowExec) => c.child - case WholeStageCodegenExec(FilterExec(_, c: ColumnarToRowExec)) => c.child + case WholeStageCodegenExec(FilterExec(_, ColumnarToRowExec(i: InputAdapter))) => i.child } assert(planBeforeFilter.head.isInstanceOf[InMemoryTableScanExec]) From 6bcbcc602b006b2881d74505a4136b6b22eb50be Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 30 Jul 2019 14:35:37 +0800 Subject: [PATCH 4/6] fix test --- .../execution/LogicalPlanTagInSparkPlanSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala index 9bef43f1f1125..aa83b9b11dcfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala @@ -44,9 +44,9 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { } // A scan plan tree is a plan tree that has a leaf node under zero or more Project/Filter nodes. - // We may add `ColumnarToRowExec` above the scan node after planning, we should also skip it. + // We may add `ColumnarToRowExec` and `InputAdapter` above the scan node after planning. private def isScanPlanTree(plan: SparkPlan): Boolean = plan match { - case c: ColumnarToRowExec => isScanPlanTree(c.child) + case ColumnarToRowExec(i: InputAdapter) => isScanPlanTree(i.child) case p: ProjectExec => isScanPlanTree(p.child) case f: FilterExec => isScanPlanTree(f.child) case _: LeafExecNode => true @@ -90,12 +90,13 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite { assert(plan.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isEmpty) case _ if isScanPlanTree(plan) => - // `ColumnarToRowExec` is added outside of the planner, which doesn't have the logical plan - // tag. + // `ColumnarToRowExec` and `InputAdapter` are added outside of the planner, which doesn't + // have the logical plan tag. val actualPlan = plan match { - case c: ColumnarToRowExec => c.child + case ColumnarToRowExec(i: InputAdapter) => i.child case _ => plan } + // The strategies for planning scan can remove or add FilterExec/ProjectExec nodes, // so it's not simple to check. Instead, we only check that the origin LogicalPlan // contains the corresponding leaf node of the SparkPlan. From af177aa8240691db9cb4a3c4bd2e376951596d9a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 1 Aug 2019 23:14:21 +0800 Subject: [PATCH 5/6] address comments --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 9c28568839ce6..6fb827a946dd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -347,9 +347,9 @@ case class FileSourceScanExec( override def hasNext: Boolean = { // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. val startNs = System.nanoTime() - val re = fileScanIterator.hasNext - scanTimeMetrics += ((System.nanoTime() - startNs) / (1000 * 1000)) - re + val res = fileScanIterator.hasNext + scanTimeMetrics += NANOSECONDS.toMillis(System.nanoTime() - startNs) + res } } From 308fc11bfa726143d627abfe37b038cd655f6051 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 2 Aug 2019 11:59:09 +0800 Subject: [PATCH 6/6] address comments --- .../apache/spark/sql/execution/Columnar.scala | 4 +- .../sql/execution/DataSourceScanExec.scala | 54 +++++++++---------- .../sql/execution/WholeStageCodegenExec.scala | 2 +- 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index d82b343b3742f..9d1636ccf2718 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -82,11 +82,11 @@ case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with Codege // plan (this) in the closure. val localOutput = this.output child.executeColumnar().mapPartitionsInternal { batches => - val outputProject = UnsafeProjection.create(localOutput, localOutput) + val toUnsafe = UnsafeProjection.create(localOutput, localOutput) batches.flatMap { batch => numInputBatches += 1 numOutputRows += batch.numRows() - batch.rowIterator().asScala.map(outputProject) + batch.rowIterator().asScala.map(toUnsafe) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 6fb827a946dd6..984f4d3474b03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -337,44 +337,33 @@ case class FileSourceScanExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), - "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) - - private abstract class ScanTimeTrackingIterator[T]( - fileScanIterator: Iterator[T], - scanTimeMetrics: SQLMetric) extends Iterator[T] { - - override def hasNext: Boolean = { - // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. - val startNs = System.nanoTime() - val res = fileScanIterator.hasNext - scanTimeMetrics += NANOSECONDS.toMillis(System.nanoTime() - startNs) - res + "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time") + ) ++ { + // Tracking scan time has overhead, we can't afford to do it for each row, and can only do + // it for each batch. + if (supportsColumnar) { + Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + } else { + None } } protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - val scanTime = longMetric("scanTime") if (needsUnsafeRowConversion) { inputRDD.mapPartitionsWithIndexInternal { (index, iter) => - val proj = UnsafeProjection.create(schema) - proj.initialize(index) - - new ScanTimeTrackingIterator[InternalRow](iter, scanTime) { - override def next(): InternalRow = { - numOutputRows += 1 - proj(iter.next()) - } + val toUnsafe = UnsafeProjection.create(schema) + toUnsafe.initialize(index) + iter.map { row => + numOutputRows += 1 + toUnsafe(row) } } } else { inputRDD.mapPartitionsInternal { iter => - new ScanTimeTrackingIterator[InternalRow](iter, scanTime) { - override def next(): InternalRow = { - numOutputRows += 1 - iter.next() - } + iter.map { row => + numOutputRows += 1 + row } } } @@ -384,7 +373,16 @@ case class FileSourceScanExec( val numOutputRows = longMetric("numOutputRows") val scanTime = longMetric("scanTime") inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => - new ScanTimeTrackingIterator[ColumnarBatch](batches, scanTime) { + new Iterator[ColumnarBatch] { + + override def hasNext: Boolean = { + // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. + val startNs = System.nanoTime() + val res = batches.hasNext + scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs) + res + } + override def next(): ColumnarBatch = { val batch = batches.next() numOutputRows += batch.numRows() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 73ed2a8a54350..5fda272ce21a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -519,7 +519,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod // `InputAdapter` can only generate code to process the rows from its child. If the child produces // columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by - // overriding `inputRDD`. + // overriding `inputRDDs` and calling `InputAdapter#executeColumnar` directly. override def inputRDD: RDD[InternalRow] = child.execute() // This is a leaf node so the node can produce limit not reached checks.