Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.execution._
import org.apache.gluten.extension.CHRemoveTopmostColumnarToRow

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

class RDDScanSuite extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper {

override protected val needCopyParquetToTablePath = true
override protected val tablesPath: String = basePath + "/tpch-data"
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath + "queries-output"

override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}

test("test RDDScanTransform") {
val test_sql =
"""
|SELECT
| l_returnflag,
| l_linestatus,
| sum(l_quantity) AS sum_qty
|FROM
| lineitem
|WHERE
| l_shipdate <= date'1998-09-02' - interval 1 day
|GROUP BY
| l_returnflag,
| l_linestatus
|""".stripMargin

val expectedAnswer = sql(test_sql).collect()

spark.sparkContext.setLocalProperty(
CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
"true")
val data = sql(test_sql)
val node = LogicalRDD.fromDataset(
rdd = data.queryExecution.toRdd,
originDataset = data,
isStreaming = false)

spark.sparkContext.setLocalProperty(
CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
"false")
val df = Dataset.ofRows(data.sparkSession, node).toDF()
checkAnswer(df, expectedAnswer)

var cnt = df.queryExecution.executedPlan.collect { case _: CHRDDScanTransformer => true }
assertResult(1)(cnt.size)

val data2 = sql(test_sql)
val node2 = LogicalRDD.fromDataset(
rdd = data2.queryExecution.toRdd,
originDataset = data2,
isStreaming = false)

val df2 = Dataset.ofRows(data2.sparkSession, node2).toDF()
checkAnswer(df2, expectedAnswer)
cnt = df2.queryExecution.executedPlan.collect { case _: CHRDDScanTransformer => true }
assertResult(1)(cnt.size)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.gluten.component

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.execution.OffloadKafkaScan
import org.apache.gluten.extension.columnar.KafkaMiscColumnarRules.RemoveStreamingTopmostColumnarToRow
import org.apache.gluten.extension.injector.Injector

class CHKafkaComponent extends Component {
Expand All @@ -28,5 +29,6 @@ class CHKafkaComponent extends Component {
override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
OffloadKafkaScan.inject(injector)
injector.gluten.legacy.injectPost(c => RemoveStreamingTopmostColumnarToRow(c.session, c.caller.isStreaming()))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension.columnar

import org.apache.gluten.execution.MicroBatchScanExecTransformer
import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FakeRowAdaptor

object KafkaMiscColumnarRules {
// Remove topmost columnar-to-row.
case class RemoveStreamingTopmostColumnarToRow(session: SparkSession, isStreamingPlan: Boolean)
extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (
!isStreamingPlan || plan.collectFirst { case e: MicroBatchScanExecTransformer => e }.isEmpty
) {
return plan
}

plan match {
case ColumnarToRowLike(child) => wrapperFakeRowAdaptor(child)
case other => other
}
}

private def wrapperFakeRowAdaptor(plan: SparkPlan): SparkPlan = {
FakeRowAdaptor(plan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private native long nativeCreate(

private native void nativeFlush(long instance);

public static native long directWrite(OutputStream stream, byte[] buf, int size, long block);

public void write(ColumnarBatch cb) {
if (cb.numCols() == 0 || cb.numRows() == 0) return;
CHNativeBlock block = CHNativeBlock.fromColumnarBatch(cb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public BlockStats getBlockStats(int columnPosition) {
return nativeBlockStats(blockAddress, columnPosition);
}

public static native long copyBlock(long blockAddress);

public ColumnarBatch copyColumnarBatch() {
return new CHNativeBlock(copyBlock(blockAddress)).toColumnarBatch();
}

public void close() {
if (blockAddress != 0) {
nativeClose(blockAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ private static native long createNativeShuffleReader(

private native long nativeNext(long nativeShuffleReader);

public static native long directRead(InputStream inputStream, byte[] buffer, int bufferSize);

public CHNativeBlock next() {
long block = nativeNext(nativeShuffleReader);
return new CHNativeBlock(block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ object CHRuleApi {

// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, c.caller.isAqe()))
injector.injectPost(c => CHRemoveTopmostColumnarToRow(c.session, c.caller.isAqe()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => intercept(each(c.session))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer
import org.apache.gluten.vectorized.{BlockOutputStream, CHColumnarBatchSerializer, CHNativeBlock, CHStreamReader}

import org.apache.spark.ShuffleDependency
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -58,6 +58,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.commons.lang3.ClassUtils

import java.io.{ObjectInputStream, ObjectOutputStream}
import java.lang.{Long => JLong}
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}

Expand Down Expand Up @@ -525,6 +526,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
wrapChild(union)
case ordered: TakeOrderedAndProjectExecTransformer =>
wrapChild(ordered)
case rddScan: CHRDDScanTransformer =>
wrapChild(rddScan)
case other =>
throw new GlutenNotSupportException(
s"Not supported operator ${other.nodeName} for BroadcastRelation")
Expand Down Expand Up @@ -965,4 +968,29 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
case co: FlattenedOr => GenericExpressionTransformer(co.name, children, co)
case _ => super.genFlattenedExpressionTransformer(substraitName, children, expr)
}

override def isSupportRDDScanExec(plan: RDDScanExec): Boolean = true

override def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
CHRDDScanTransformer.replace(plan)

override def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch =
CHNativeBlock.fromColumnarBatch(batch).copyColumnarBatch()

override def serializeColumnarBatch(output: ObjectOutputStream, batch: ColumnarBatch): Unit = {
val writeBuffer: Array[Byte] =
new Array[Byte](CHBackendSettings.customizeBufferSize)
BlockOutputStream.directWrite(
output,
writeBuffer,
CHBackendSettings.customizeBufferSize,
CHNativeBlock.fromColumnarBatch(batch).blockAddress())
}

override def deserializeColumnarBatch(input: ObjectInputStream): ColumnarBatch = {
val bufferSize = CHBackendSettings.customizeBufferSize
val readBuffer: Array[Byte] = new Array[Byte](bufferSize)
val address = CHStreamReader.directRead(input, readBuffer, bufferSize)
new CHNativeBlock(address).toColumnarBatch
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.extension

import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeLike}

// Remove the topmost columnar-to-row conversion
// Primarily for structured streaming and delta deletion vector
//
// Sometimes, the code uses dataFrame.queryExecution.toRdd as the data source.
// queryExecution will use columnar-to-row (c2r) and row-to-columnar (r2c)
// conversions for the next operation.
// This rule aims to eliminate the redundant double conversion.
case class CHRemoveTopmostColumnarToRow(session: SparkSession, isAdaptiveContext: Boolean)
extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
val removeTopmostColumnarToRow = CHRemoveTopmostColumnarToRow.isRemoveTopmostC2R(session)

if (!removeTopmostColumnarToRow) {
return plan
}

plan match {
case shuffleExchangeLike @ ColumnarToRowLike(_: ShuffleExchangeLike) =>
shuffleExchangeLike
case broadcastExchangeLike @ ColumnarToRowLike(_: BroadcastExchangeLike) =>
broadcastExchangeLike
case broadcastQueryStageExec @ ColumnarToRowLike(_: BroadcastQueryStageExec) =>
broadcastQueryStageExec
case ColumnarToRowLike(child) => wrapperColumnarRowAdaptor(child)
case other => other
}
}

private def wrapperColumnarRowAdaptor(plan: SparkPlan): SparkPlan = {
FakeRowAdaptor(plan)
}
}

object CHRemoveTopmostColumnarToRow {
val REMOVE_TOPMOST_COLUMNAR_TO_ROW: String = "gluten.removeTopmostColumnarToRow"

def isRemoveTopmostC2R(spark: SparkSession): Boolean = {
Option(spark.sparkContext.getLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW)).exists(_.toBoolean)
}

def setRemoveTopmostC2R(value: Boolean, spark: SparkSession): Unit = {
spark.sparkContext.setLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW, value.toString)
}
}
Loading