From 14fb62149cbd5f94711c1315885a7b751a70bd07 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Fri, 9 May 2025 11:43:53 -0700 Subject: [PATCH 01/13] init --- .../spark/sql/execution/ExistingRDD.scala | 48 +++++++++++++++++++ .../spark/sql/execution/SparkStrategies.scala | 4 +- .../sql/execution/WholeStageCodegenExec.scala | 1 + .../org/apache/spark/sql/SQLQuerySuite.scala | 14 +++++- 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 1ac7aa00d98c6..2d9ca9e34f30e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -319,3 +319,51 @@ case class RDDScanExec( override def getStream: Option[SparkDataStream] = stream } + +/** + * A special case of RDDScanExec that is used to represent a scan without a `FROM` clause. + * For example, 'select version()'. + * + * We do not extend `RDDScanExec` in order to avoid complexity due to `TreeNode.makeCopy` and + * `TreeNode`'s general use of reflection. + */ +case class OneRowRelationExec() extends LeafExecNode + with StreamSourceAwareSparkPlan + with InputRDDCodegen { + + override val nodeName: String = s"Scan OneRowRelation" + + override val output: Seq[Attribute] = Nil + + val rdd = session.sparkContext.parallelize(Seq(InternalRow()), 1) + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + rdd.mapPartitionsWithIndexInternal { (index, iter) => + val proj = UnsafeProjection.create(schema) + proj.initialize(index) + iter.map { r => + numOutputRows += 1 + proj(r) + } + } + } + + override def simpleString(maxFields: Int): String = { + s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}" + } + + override def inputRDD: RDD[InternalRow] = rdd + + // Input can be InternalRow, has to be turned into UnsafeRows. + override protected val createUnsafeProjection: Boolean = true + + override protected def doCanonicalize(): SparkPlan = { + super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy() + } + + override def getStream: Option[SparkDataStream] = None +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 34d24e3b1e7f1..da1ccb2532f0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -690,8 +690,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - protected lazy val singleRowRdd = session.sparkContext.parallelize(Seq(InternalRow()), 1) - object InMemoryScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => @@ -1054,7 +1052,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { generator, g.requiredChildOutput, outer, g.qualifiedGeneratorOutput, planLater(child)) :: Nil case _: logical.OneRowRelation => - execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil + execution.OneRowRelationExec() :: Nil case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 1ee467ef3554b..21b5177fe2208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -56,6 +56,7 @@ trait CodegenSupport extends SparkPlan { case _: SortMergeJoinExec => "smj" case _: BroadcastNestedLoopJoinExec => "bnlj" case _: RDDScanExec => "rdd" + case _: OneRowRelationExec => "orr" case _: DataSourceScanExec => "scan" case _: InMemoryTableScanExec => "memoryScan" case _: WholeStageCodegenExec => "wholestagecodegen" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 138a29c6ae804..26aa4b6b5210f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedCo import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression, Sort} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.execution.{CommandResultExec, UnionExec} +import org.apache.spark.sql.execution.{CommandResultExec, OneRowRelationExec, UnionExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec @@ -4962,6 +4962,18 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark parameters = Map("plan" -> "'Aggregate [groupingsets(Vector(0), posexplode(array(col)))]") ) } + + Seq(true, false).foreach { codegenEnabled => + test(s"SPARK-52060: one row relation with codegen enabled - $codegenEnabled") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) { + val df = spark.sql("select 'test' stringCol") + checkAnswer(df, Row("test")) + val plan = df.queryExecution.executedPlan + val oneRowRelationExists = plan.find(_.isInstanceOf[OneRowRelationExec]).isDefined + assert(oneRowRelationExists) + } + } + } } case class Foo(bar: Option[String]) From d8a6ea6f4c4616180c91a59830624805581ff591 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Mon, 19 May 2025 15:19:34 -0700 Subject: [PATCH 02/13] comments --- .../apache/spark/sql/execution/ExistingRDD.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 2d9ca9e34f30e..f33770f1ad994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -321,14 +321,12 @@ case class RDDScanExec( } /** - * A special case of RDDScanExec that is used to represent a scan without a `FROM` clause. - * For example, 'select version()'. + * A physical plan node for `OneRowRelation` for scans with no 'FROM' clause. * * We do not extend `RDDScanExec` in order to avoid complexity due to `TreeNode.makeCopy` and * `TreeNode`'s general use of reflection. */ case class OneRowRelationExec() extends LeafExecNode - with StreamSourceAwareSparkPlan with InputRDDCodegen { override val nodeName: String = s"Scan OneRowRelation" @@ -341,20 +339,17 @@ case class OneRowRelationExec() extends LeafExecNode "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { + val outputRow = InternalRow.empty val numOutputRows = longMetric("numOutputRows") rdd.mapPartitionsWithIndexInternal { (index, iter) => - val proj = UnsafeProjection.create(schema) - proj.initialize(index) iter.map { r => numOutputRows += 1 - proj(r) + outputRow } } } - override def simpleString(maxFields: Int): String = { - s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}" - } + override def simpleString(maxFields: Int): String = s"$nodeName[]" override def inputRDD: RDD[InternalRow] = rdd @@ -365,5 +360,5 @@ case class OneRowRelationExec() extends LeafExecNode super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy() } - override def getStream: Option[SparkDataStream] = None + // override def getStream: Option[SparkDataStream] = None } From d784470da9e173da5272acf5a9d1f4219f917eff Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Mon, 19 May 2025 18:23:35 -0700 Subject: [PATCH 03/13] output unsafe row --- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index f33770f1ad994..1deba429174db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -339,8 +339,9 @@ case class OneRowRelationExec() extends LeafExecNode "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val outputRow = InternalRow.empty val numOutputRows = longMetric("numOutputRows") + val proj = UnsafeProjection.create(schema) + val outputRow = proj(InternalRow.empty) rdd.mapPartitionsWithIndexInternal { (index, iter) => iter.map { r => numOutputRows += 1 From 1d947fc27473358db8acd130886919f20040de81 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Mon, 19 May 2025 18:29:12 -0700 Subject: [PATCH 04/13] cleanup --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 1deba429174db..61d230d696ace 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -333,7 +333,9 @@ case class OneRowRelationExec() extends LeafExecNode override val output: Seq[Attribute] = Nil - val rdd = session.sparkContext.parallelize(Seq(InternalRow()), 1) + private val emptyRow: InternalRow = InternalRow.empty + + private val rdd = session.sparkContext.parallelize(Seq(emptyRow), 1) override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -341,7 +343,7 @@ case class OneRowRelationExec() extends LeafExecNode protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val proj = UnsafeProjection.create(schema) - val outputRow = proj(InternalRow.empty) + val outputRow = proj(emptyRow) rdd.mapPartitionsWithIndexInternal { (index, iter) => iter.map { r => numOutputRows += 1 @@ -360,6 +362,4 @@ case class OneRowRelationExec() extends LeafExecNode override protected def doCanonicalize(): SparkPlan = { super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy() } - - // override def getStream: Option[SparkDataStream] = None } From fd016a62c53a6237531f2e844b7258b89785e636 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Mon, 19 May 2025 18:33:25 -0700 Subject: [PATCH 05/13] cleanup --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 61d230d696ace..beeaf9427f71d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -356,7 +356,6 @@ case class OneRowRelationExec() extends LeafExecNode override def inputRDD: RDD[InternalRow] = rdd - // Input can be InternalRow, has to be turned into UnsafeRows. override protected val createUnsafeProjection: Boolean = true override protected def doCanonicalize(): SparkPlan = { From eef16fe9825342f00db4031e44fe2a616003ca25 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Tue, 20 May 2025 13:29:31 -0700 Subject: [PATCH 06/13] comments --- .../spark/sql/execution/ExistingRDD.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index beeaf9427f71d..dd9109200e8e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -333,30 +333,30 @@ case class OneRowRelationExec() extends LeafExecNode override val output: Seq[Attribute] = Nil - private val emptyRow: InternalRow = InternalRow.empty - - private val rdd = session.sparkContext.parallelize(Seq(emptyRow), 1) + private val rdd: RDD[UnsafeRow] = { + val proj = UnsafeProjection.create(schema) + val emptyRow = proj(InternalRow.empty) + session.sparkContext.parallelize(Seq(emptyRow), 1) + } override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - val proj = UnsafeProjection.create(schema) - val outputRow = proj(emptyRow) - rdd.mapPartitionsWithIndexInternal { (index, iter) => + rdd.mapPartitionsWithIndexInternal { (_, iter) => iter.map { r => numOutputRows += 1 - outputRow + r } } } override def simpleString(maxFields: Int): String = s"$nodeName[]" - override def inputRDD: RDD[InternalRow] = rdd + override def inputRDD: RDD[InternalRow] = rdd.asInstanceOf[RDD[InternalRow]] - override protected val createUnsafeProjection: Boolean = true + override protected val createUnsafeProjection: Boolean = false override protected def doCanonicalize(): SparkPlan = { super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy() From 1b3bdf8f901a14d83824e82e5d8d4af106d585da Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Fri, 23 May 2025 11:04:31 -0700 Subject: [PATCH 07/13] switch back --- .../apache/spark/sql/execution/ExistingRDD.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index dd9109200e8e7..f1b1fa15c62e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -333,11 +333,7 @@ case class OneRowRelationExec() extends LeafExecNode override val output: Seq[Attribute] = Nil - private val rdd: RDD[UnsafeRow] = { - val proj = UnsafeProjection.create(schema) - val emptyRow = proj(InternalRow.empty) - session.sparkContext.parallelize(Seq(emptyRow), 1) - } + private val rdd: RDD[InternalRow] = session.sparkContext.parallelize(Seq(InternalRow.empty), 1) override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -345,18 +341,19 @@ case class OneRowRelationExec() extends LeafExecNode protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") rdd.mapPartitionsWithIndexInternal { (_, iter) => + val proj = UnsafeProjection.create(schema) iter.map { r => numOutputRows += 1 - r + proj(r) } } } override def simpleString(maxFields: Int): String = s"$nodeName[]" - override def inputRDD: RDD[InternalRow] = rdd.asInstanceOf[RDD[InternalRow]] + override def inputRDD: RDD[InternalRow] = rdd - override protected val createUnsafeProjection: Boolean = false + override protected val createUnsafeProjection: Boolean = true override protected def doCanonicalize(): SparkPlan = { super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy() From 0b3db011b971d9dd8cb56f4cd7ca3f9e51de4079 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Mon, 26 May 2025 16:17:43 -0700 Subject: [PATCH 08/13] commetns --- .../spark/sql/execution/ExistingRDD.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index f1b1fa15c62e5..58de6bb625a96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -333,21 +333,24 @@ case class OneRowRelationExec() extends LeafExecNode override val output: Seq[Attribute] = Nil - private val rdd: RDD[InternalRow] = session.sparkContext.parallelize(Seq(InternalRow.empty), 1) + private val rdd: RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + session + .sparkContext + .parallelize(Seq(InternalRow()), 1) + .mapPartitionsInternal { _ => + val proj = UnsafeProjection.create(Seq.empty[Expression]) + Iterator(proj.apply(InternalRow.empty)).map { r => + numOutputRows += 1 + r + } + } + } override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - rdd.mapPartitionsWithIndexInternal { (_, iter) => - val proj = UnsafeProjection.create(schema) - iter.map { r => - numOutputRows += 1 - proj(r) - } - } - } + protected override def doExecute(): RDD[InternalRow] = rdd override def simpleString(maxFields: Int): String = s"$nodeName[]" From 922624eb8a0dd787dfacface18686f3590197bd8 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Mon, 26 May 2025 17:55:16 -0700 Subject: [PATCH 09/13] flip create unsafe projetion --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 58de6bb625a96..88d6a47573981 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -356,7 +356,7 @@ case class OneRowRelationExec() extends LeafExecNode override def inputRDD: RDD[InternalRow] = rdd - override protected val createUnsafeProjection: Boolean = true + override protected val createUnsafeProjection: Boolean = false override protected def doCanonicalize(): SparkPlan = { super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy() From b95fc441f1c2d0a1263c403d0a4ab3cf26fcdb04 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 27 May 2025 15:31:10 +0800 Subject: [PATCH 10/13] Update sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 88d6a47573981..bc079583b1568 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -337,7 +337,7 @@ case class OneRowRelationExec() extends LeafExecNode val numOutputRows = longMetric("numOutputRows") session .sparkContext - .parallelize(Seq(InternalRow()), 1) + .parallelize(Seq.empty[Unit], 1) .mapPartitionsInternal { _ => val proj = UnsafeProjection.create(Seq.empty[Expression]) Iterator(proj.apply(InternalRow.empty)).map { r => From 332562859d04011660e89e71a42e413da8d1e5e2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 27 May 2025 17:48:07 +0800 Subject: [PATCH 11/13] Update sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index bc079583b1568..0b67c41703928 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -337,7 +337,7 @@ case class OneRowRelationExec() extends LeafExecNode val numOutputRows = longMetric("numOutputRows") session .sparkContext - .parallelize(Seq.empty[Unit], 1) + .parallelize(Seq.empty[Int], 1) .mapPartitionsInternal { _ => val proj = UnsafeProjection.create(Seq.empty[Expression]) Iterator(proj.apply(InternalRow.empty)).map { r => From d1e9a48d3655e20cf4b257efb0dd82e4c42c3e65 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 27 May 2025 22:31:29 +0800 Subject: [PATCH 12/13] Update sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 0b67c41703928..b7ab428c48ff1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -337,7 +337,7 @@ case class OneRowRelationExec() extends LeafExecNode val numOutputRows = longMetric("numOutputRows") session .sparkContext - .parallelize(Seq.empty[Int], 1) + .parallelize(Seq[Int](), 1) .mapPartitionsInternal { _ => val proj = UnsafeProjection.create(Seq.empty[Expression]) Iterator(proj.apply(InternalRow.empty)).map { r => From 9f5527e20af4ea6d1fdb415baa83829005661e1f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 28 May 2025 10:05:30 +0800 Subject: [PATCH 13/13] Update sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b7ab428c48ff1..fc941efb9eaf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -337,7 +337,7 @@ case class OneRowRelationExec() extends LeafExecNode val numOutputRows = longMetric("numOutputRows") session .sparkContext - .parallelize(Seq[Int](), 1) + .parallelize(Seq(""), 1) .mapPartitionsInternal { _ => val proj = UnsafeProjection.create(Seq.empty[Expression]) Iterator(proj.apply(InternalRow.empty)).map { r =>