diff --git a/jvm/src/main/java/com/intel/oap/row/JniInstance.java b/jvm/src/main/java/com/intel/oap/row/JniInstance.java new file mode 100644 index 000000000000..1110648e7ad1 --- /dev/null +++ b/jvm/src/main/java/com/intel/oap/row/JniInstance.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.row; + +import io.kyligence.jni.engine.LocalEngine; +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Java API for in-process profiling. Serves as a wrapper around + * async-profiler native library. This class is a singleton. + * The first call to {@link #getInstance()} initiates loading of + * libasyncProfiler.so. + */ +public class JniInstance { + + private static JniInstance instance; + + private String currLibPath = ""; + + private JniInstance() { + } + + public static JniInstance getInstance() { + return getInstance(null); + } + + public static synchronized JniInstance getInstance(String libPath) { + if (instance != null) { + return instance; + } + + File file = null; + boolean libPathExists = false; + if (StringUtils.isNotBlank(libPath)) { + file = new File(libPath); + libPathExists = file.isFile() && file.exists(); + } + if (!libPathExists) { + String soFileName = "/liblocal_engine_jnid.so"; + try { + InputStream is = JniInstance.class.getResourceAsStream(soFileName); + file = File.createTempFile("lib", ".so"); + OutputStream os = new FileOutputStream(file); + byte[] buffer = new byte[128 << 10]; + int length; + while ((length = is.read(buffer)) != -1) { + os.write(buffer, 0, length); + } + is.close(); + os.close(); + } catch (IOException e) { + } + } + if (file != null) { + try { + file.setReadable(true, false); + System.load(file.getAbsolutePath()); + libPath = file.getAbsolutePath(); + } catch (UnsatisfiedLinkError error) { + throw error; + } + } + instance = new JniInstance(); + instance.setCurrLibPath(libPath); + LocalEngine.initEngineEnv(); + return instance; + } + + public void setCurrLibPath(String currLibPath) { + this.currLibPath = currLibPath; + } + + public LocalEngine buildLocalEngine(byte[] substraitPlan) { + LocalEngine localEngine = new LocalEngine(substraitPlan); + return localEngine; + } +} diff --git a/jvm/src/main/java/com/intel/oap/row/RowIterator.java b/jvm/src/main/java/com/intel/oap/row/RowIterator.java new file mode 100644 index 000000000000..e6b61f23f4d7 --- /dev/null +++ b/jvm/src/main/java/com/intel/oap/row/RowIterator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.row; + +import io.kyligence.jni.engine.LocalEngine; + +import java.io.IOException; + +public class RowIterator { + + private LocalEngine localEngine; + private boolean closed = false; + + public RowIterator() throws IOException {} + + public RowIterator(byte[] plan, String soFilePath) throws IOException { + this.localEngine = JniInstance.getInstance(soFilePath).buildLocalEngine(plan); + this.localEngine.execute(); + } + + public boolean hasNext() throws IOException { + return this.localEngine.hasNext(); + } + + public SparkRowInfo next() throws IOException { + if (this.localEngine == null) { + return null; + } + return this.localEngine.next(); + } + + public void close() { + if (!closed) { + if (this.localEngine != null) { + try { + this.localEngine.close(); + } catch (IOException e) { + } + } + closed = true; + } + } +} diff --git a/jvm/src/main/java/com/intel/oap/vectorized/PartitionFileInfo.java b/jvm/src/main/java/com/intel/oap/row/SparkRowInfo.java similarity index 63% rename from jvm/src/main/java/com/intel/oap/vectorized/PartitionFileInfo.java rename to jvm/src/main/java/com/intel/oap/row/SparkRowInfo.java index 47e5fd88e516..61a1eaa08851 100644 --- a/jvm/src/main/java/com/intel/oap/vectorized/PartitionFileInfo.java +++ b/jvm/src/main/java/com/intel/oap/row/SparkRowInfo.java @@ -15,23 +15,18 @@ * limitations under the License. */ -package com.intel.oap.vectorized; +package com.intel.oap.row; -/** POJO to hold the output file path of the designated partition id */ -public class PartitionFileInfo { - private final int partitionId; - private final String filePath; +public class SparkRowInfo { + public long[] offsets; + public long[] lengths; + public long memoryAddress; + public long fieldsNum; - public PartitionFileInfo(int partitionId, String filePath) { - this.partitionId = partitionId; - this.filePath = filePath; - } - - public int getPartitionId() { - return partitionId; - } - - public String getFilePath() { - return filePath; - } + public SparkRowInfo(long[] offsets, long[] lengths, long memoryAddress, long fieldsNum) { + this.offsets = offsets; + this.lengths = lengths; + this.memoryAddress = memoryAddress; + this.fieldsNum = fieldsNum; + } } diff --git a/jvm/src/main/java/io/kyligence/jni/engine/LocalEngine.java b/jvm/src/main/java/io/kyligence/jni/engine/LocalEngine.java new file mode 100644 index 000000000000..d3c7fd87dd21 --- /dev/null +++ b/jvm/src/main/java/io/kyligence/jni/engine/LocalEngine.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.kyligence.jni.engine; + +import java.io.Closeable; +import java.io.IOException; + +import com.intel.oap.row.SparkRowInfo; + +public class LocalEngine implements Closeable { + public static native long test(int a, int b); + + public static native void initEngineEnv(); + + private long nativeExecutor; + private byte[] plan; + + public LocalEngine(byte[] plan) { + this.plan = plan; + } + + public native void execute(); + + public native boolean hasNext(); + + public native SparkRowInfo next(); + + + @Override + public native void close() throws IOException; +} diff --git a/jvm/src/main/scala/com/intel/oap/GazellePluginConfig.scala b/jvm/src/main/scala/com/intel/oap/GazellePluginConfig.scala index baf75a1003e3..be336b0ac554 100644 --- a/jvm/src/main/scala/com/intel/oap/GazellePluginConfig.scala +++ b/jvm/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -17,7 +17,6 @@ package com.intel.oap -import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf @@ -128,6 +127,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val enablePreferColumnar: Boolean = conf.getConfString("spark.oap.sql.columnar.preferColumnar", "true").toBoolean + // This config is used for specifying whether to use columnar basic iterator. + val enableColumnarIterator: Boolean = + conf.getConfString("spark.oap.sql.columnar.iterator", "true").toBoolean + // This config is used for testing. Setting to false will disable loading native libraries. val loadNative: Boolean = conf.getConfString("spark.oap.sql.columnar.loadnative", "true").toBoolean @@ -136,6 +139,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging { val nativeLibName: String = conf.getConfString("spark.oap.sql.columnar.libname", "spark_columnar_jni") + // This config is used for specifying the absolute path of the native library. + val nativeLibPath: String = + conf.getConfString("spark.oap.sql.columnar.libpath", "") + // fallback to row operators if there are several continous joins val joinOptimizationThrottle: Integer = conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "12").toInt diff --git a/jvm/src/main/scala/com/intel/oap/execution/BasicPhysicalOperatorTransformer.scala b/jvm/src/main/scala/com/intel/oap/execution/BasicPhysicalOperatorTransformer.scala index 13ac5a25f1eb..43b72a523f28 100644 --- a/jvm/src/main/scala/com/intel/oap/execution/BasicPhysicalOperatorTransformer.scala +++ b/jvm/src/main/scala/com/intel/oap/execution/BasicPhysicalOperatorTransformer.scala @@ -21,8 +21,9 @@ import com.intel.oap.expression._ import com.intel.oap.substrait.expression.ExpressionNode import com.intel.oap.substrait.rel.{RelBuilder, RelNode} import com.intel.oap.substrait.SubstraitContext - +import com.intel.oap.GazellePluginConfig import org.apache.spark.SparkConf + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ @@ -43,7 +44,7 @@ case class ConditionProjectExecTransformer( val sparkConf: SparkConf = sparkContext.getConf - override def supportsColumnar: Boolean = true + override def supportsColumnar: Boolean = GazellePluginConfig.getConf.enableColumnarIterator override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), diff --git a/jvm/src/main/scala/com/intel/oap/execution/BatchScanExecTransformer.scala b/jvm/src/main/scala/com/intel/oap/execution/BatchScanExecTransformer.scala index 80dd17cdae2b..f82028469392 100644 --- a/jvm/src/main/scala/com/intel/oap/execution/BatchScanExecTransformer.scala +++ b/jvm/src/main/scala/com/intel/oap/execution/BatchScanExecTransformer.scala @@ -19,10 +19,8 @@ package com.intel.oap.execution import com.intel.oap.GazellePluginConfig import com.intel.oap.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer} -import com.intel.oap.substrait.rel.{LocalFilesBuilder, RelBuilder} +import com.intel.oap.substrait.rel.RelBuilder import com.intel.oap.substrait.SubstraitContext -import com.intel.oap.substrait.`type`.TypeBuiler -import com.intel.oap.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ @@ -34,14 +32,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch class BatchScanExecTransformer(output: Seq[AttributeReference], @transient scan: Scan) extends BatchScanExec(output, scan) with TransformSupport { - val tmpDir: String = GazellePluginConfig.getConf.tmpFile + val filterExprs: Seq[Expression] = if (scan.isInstanceOf[FileScan]) { scan.asInstanceOf[FileScan].dataFilters } else { throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported") } - override def supportsColumnar(): Boolean = true + override def supportsColumnar(): Boolean = + super.supportsColumnar && GazellePluginConfig.getConf.enableColumnarIterator + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"), diff --git a/jvm/src/main/scala/com/intel/oap/execution/HashAggregateExecTransformer.scala b/jvm/src/main/scala/com/intel/oap/execution/HashAggregateExecTransformer.scala index acdf18a936c8..e0c49d261ad2 100644 --- a/jvm/src/main/scala/com/intel/oap/execution/HashAggregateExecTransformer.scala +++ b/jvm/src/main/scala/com/intel/oap/execution/HashAggregateExecTransformer.scala @@ -24,6 +24,7 @@ import com.intel.oap.expression._ import com.intel.oap.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode} import com.intel.oap.substrait.rel.{RelBuilder, RelNode} import com.intel.oap.substrait.SubstraitContext +import com.intel.oap.GazellePluginConfig import java.util import org.apache.spark.rdd.RDD @@ -52,7 +53,7 @@ case class HashAggregateExecTransformer( val sparkConf = sparkContext.getConf - override def supportsColumnar: Boolean = true + override def supportsColumnar: Boolean = GazellePluginConfig.getConf.enableColumnarIterator val resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute) @@ -89,7 +90,7 @@ case class HashAggregateExecTransformer( override def doValidate(): Boolean = { var isPartial = true - aggregateExpressions.toList.foreach(aggExpr => { + aggregateExpressions.foreach(aggExpr => { aggExpr.mode match { case Partial => case _ => isPartial = false @@ -175,7 +176,7 @@ case class HashAggregateExecTransformer( // Get the aggregate function nodes val aggregateFunctionList = new util.ArrayList[AggregateFunctionNode]() - groupingExpressions.toList.foreach(expr => { + groupingExpressions.foreach(expr => { val groupingExpr: Expression = ExpressionConverter .replaceWithExpressionTransformer(expr, originalInputAttributes) val exprNode = groupingExpr.asInstanceOf[ExpressionTransformer].doTransform(args) @@ -185,7 +186,7 @@ case class HashAggregateExecTransformer( Lists.newArrayList(exprNode), outputTypeNode) aggregateFunctionList.add(aggFunctionNode) }) - aggregateExpressions.toList.foreach(aggExpr => { + aggregateExpressions.foreach(aggExpr => { val aggregatFunc = aggExpr.aggregateFunction val functionId = AggregateFunctionsBuilder.create(args, aggregatFunc) val mode = modeToKeyWord(aggExpr.mode) diff --git a/jvm/src/main/scala/com/intel/oap/execution/NativeWholestageRowRDD.scala b/jvm/src/main/scala/com/intel/oap/execution/NativeWholestageRowRDD.scala new file mode 100644 index 000000000000..42fec932c084 --- /dev/null +++ b/jvm/src/main/scala/com/intel/oap/execution/NativeWholestageRowRDD.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.execution + +import java.io._ + +import com.intel.oap.GazellePluginConfig +import com.intel.oap.row.RowIterator +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.util._ + +class WholestageNativeRowRDD( + sc: SparkContext, + @transient private val inputPartitions: Seq[InputPartition], + partitionReaderFactory: PartitionReaderFactory, + columnarReads: Boolean) + extends RDD[InternalRow](sc, Nil) { + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo + val loadNative = GazellePluginConfig.getConf.loadNative + val libName = GazellePluginConfig.getConf.nativeLibName + + override protected def getPartitions: Array[Partition] = { + inputPartitions.zipWithIndex.map { + case (inputPartition, index) => new NativeSubstraitPartition(index, inputPartition) + }.toArray + } + + private def castPartition(split: Partition): NativeSubstraitPartition = split match { + case p: NativeSubstraitPartition => p + case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition: $split") + } + + private def castNativePartition(split: Partition): NativeFilePartition = split match { + case NativeSubstraitPartition(_, p: NativeFilePartition) => p + case _ => throw new SparkException(s"[BUG] Not a NativeSubstraitPartition: $split") + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + ExecutorManager.tryTaskSet(numaBindingInfo) + + val inputPartition = castNativePartition(split) + + var resIter : RowIterator = null + if (loadNative) { + resIter = new RowIterator(inputPartition.substraitPlan, + GazellePluginConfig.getConf.nativeLibPath) + } + + val iter = new Iterator[InternalRow] with AutoCloseable { + private val inputMetrics = TaskContext.get().taskMetrics().inputMetrics + private[this] var currentIterator: Iterator[InternalRow] = null + private var totalBatch = 0 + + override def hasNext: Boolean = { + if (loadNative) { + val hasNextRes = (currentIterator != null && currentIterator.hasNext) || nextIterator() + hasNextRes + } else { + false + } + } + + private def nextIterator(): Boolean = { + var startTime = System.nanoTime() + if (resIter.hasNext) { + logWarning(s"===========${totalBatch} ${System.nanoTime() - startTime}") + startTime = System.nanoTime() + val sparkRowInfo = resIter.next() + totalBatch += 1 + logWarning(s"===========${totalBatch} ${System.nanoTime() - startTime}") + val result = if (sparkRowInfo.offsets != null && sparkRowInfo.offsets.length > 0) { + val numRows = sparkRowInfo.offsets.length + val numFields = sparkRowInfo.fieldsNum + currentIterator = new Iterator[InternalRow] with AutoCloseable { + + var rowId = 0 + val row = new UnsafeRow(numFields.intValue()) + + override def hasNext: Boolean = { + rowId < numRows + } + + override def next(): InternalRow = { + if (rowId >= numRows) throw new NoSuchElementException + val (offset, length) = (sparkRowInfo.offsets(rowId), sparkRowInfo.lengths(rowId)) + row.pointTo(null, sparkRowInfo.memoryAddress + offset, length.toInt) + rowId += 1 + row + } + + override def close(): Unit = {} + } + true + } else { + false + } + result + } else { + false + } + } + + override def next(): InternalRow = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + val cb = currentIterator.next() + cb + } + + override def close(): Unit = { + resIter.close() + } + } + iter + } + + override def getPreferredLocations(split: Partition): Seq[String] = { + castPartition(split).inputPartition.preferredLocations() + } + +} diff --git a/jvm/src/main/scala/com/intel/oap/execution/WholeStageTransformerExec.scala b/jvm/src/main/scala/com/intel/oap/execution/WholeStageTransformerExec.scala index f34ff271d7d3..c83cd5c88d7f 100644 --- a/jvm/src/main/scala/com/intel/oap/execution/WholeStageTransformerExec.scala +++ b/jvm/src/main/scala/com/intel/oap/execution/WholeStageTransformerExec.scala @@ -101,7 +101,7 @@ case class WholeStageTransformerExec(child: SparkPlan)(val transformStageId: Int override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def supportsColumnar: Boolean = true + override def supportsColumnar: Boolean = GazellePluginConfig.getConf.enableColumnarIterator override def otherCopyArgs: Seq[AnyRef] = Seq(transformStageId.asInstanceOf[Integer]) override def generateTreeString( @@ -274,9 +274,71 @@ case class WholeStageTransformerExec(child: SparkPlan)(val transformStageId: Int case _ => throw new UnsupportedOperationException } + + def checkBatchScanExecTransformerChild(): Option[BatchScanExecTransformer] = { + var current_op = child + while (current_op.isInstanceOf[TransformSupport] && + !current_op.isInstanceOf[BatchScanExecTransformer] && + current_op.asInstanceOf[TransformSupport].getChild != null) { + current_op = current_op.asInstanceOf[TransformSupport].getChild + } + if (current_op != null && + current_op.isInstanceOf[BatchScanExecTransformer]) { + Some(current_op.asInstanceOf[BatchScanExecTransformer]) + } else { + None + } + } + override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException + val numOutputBatches = child.longMetric("numOutputBatches") + val pipelineTime = longMetric("pipelineTime") + + // check if BatchScan exists + val current_op = checkBatchScanExecTransformerChild() + if (current_op.isDefined) { + // If containing batchscan, a new RDD is created. + val batchScan = current_op.get + val wsCxt = doWholestageTransform() + + val startTime = System.nanoTime() + val substraitPlanPartition = batchScan.partitions.map( p => { + p match { + case FilePartition(index, files) => { + val paths = new java.util.ArrayList[String]() + val starts = new java.util.ArrayList[java.lang.Long]() + val lengths = new java.util.ArrayList[java.lang.Long]() + files.foreach { f => + paths.add(f.filePath) + starts.add(new java.lang.Long(f.start)) + lengths.add(new java.lang.Long(f.length)) + } + + val localFilesNode = LocalFilesBuilder.makeLocalFiles(index, paths, starts, lengths) + wsCxt.substraitContext.setLocalFilesNode(localFilesNode) + val substraitPlan = wsCxt.root.toProtobuf + /*val out = new DataOutputStream(new FileOutputStream("/tmp/SubStraitTest-Q6.dat", + false)); + out.write(substraitPlan.toByteArray()); + out.flush();*/ + + //logWarning(s"The substrait plan for partition ${index}:\n${substraitPlan.toString}") + NativeFilePartition(index, files, substraitPlan.toByteArray) + } + case _ => p + } + }) + logWarning( + s"Generated substrait plan tooks: ${(System.nanoTime() - startTime) / 1000000} ms") + + val wsRDD = new WholestageNativeRowRDD(sparkContext, substraitPlanPartition, + batchScan.readerFactory, true) + wsRDD + } else { + sparkContext.emptyRDD + } } + override def doExecuteColumnar(): RDD[ColumnarBatch] = { val signature = doBuild() val listJars = uploadAndListJars(signature) @@ -295,173 +357,10 @@ case class WholeStageTransformerExec(child: SparkPlan)(val transformStageId: Int val buildRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer() val serializableObjectHolder: ListBuffer[SerializableObject] = ListBuffer() val relationHolder: ListBuffer[ColumnarHashedRelation] = ListBuffer() -// var idx = 0 -// while (idx < buildPlans.length) { -// -// val curPlan = buildPlans(idx)._1 -// val parentPlan = buildPlans(idx)._2 - -// curRDD = curPlan match { -// case p: ColumnarBroadcastHashJoinExec => -// val fetchTime = p.longMetric("fetchTime") -// val buildTime = p.longMetric("buildTime") -// val buildPlan = p.getBuildPlan -// val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]() -// curRDD.mapPartitions { iter => -// GazellePluginConfig.getConf -// ExecutorManager.tryTaskSet(numaBindingInfo) -// // received broadcast value contain a hashmap and raw recordBatch -// val beforeFetch = System.nanoTime() -// val relation = buildInputByteBuf.value.asReadOnlyCopy() -// relationHolder += relation -// fetchTime += ((System.nanoTime() - beforeFetch) / 1000000) -// val beforeEval = System.nanoTime() -// val hashRelationObject = relation.hashRelationObj -// serializableObjectHolder += hashRelationObject -// val depIter = -// new CloseableColumnBatchIterator(relation.getColumnarBatchAsIter) -// val ctx = curPlan.asInstanceOf[TransformSupport].dependentPlanCtx -// val expression = -// TreeBuilder.makeExpression( -// ctx.root, -// Field.nullable("result", new ArrowType.Int(32, true))) -// val hashRelationKernel = new ExpressionEvaluator() -// hashRelationKernel.build(ctx.inputSchema, Lists.newArrayList(expression), true) -// val hashRelationResultIterator = hashRelationKernel.finishByIterator() -// dependentKernelIterators += hashRelationResultIterator -// // we need to set original recordBatch to hashRelationKernel -// while (depIter.hasNext) { -// val dep_cb = depIter.next() -// if (dep_cb.numRows > 0) { -// (0 until dep_cb.numCols).toList.foreach(i => -// dep_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain()) -// buildRelationBatchHolder += dep_cb -// val dep_rb = ConverterUtils.createArrowRecordBatch(dep_cb) -// hashRelationResultIterator.processAndCacheOne(ctx.inputSchema, dep_rb) -// ConverterUtils.releaseArrowRecordBatch(dep_rb) -// } -// } -// // we need to set hashRelationObject to hashRelationResultIterator -// hashRelationResultIterator.setHashRelationObject(hashRelationObject) -// build_elapse += (System.nanoTime() - beforeEval) -// buildTime += ((System.nanoTime() - beforeEval) / 1000000) -// dependentKernels += hashRelationKernel -// iter -// } -// case p: ColumnarShuffledHashJoinExec => -// val buildTime = p.longMetric("buildTime") -// val buildPlan = p.getBuildPlan -// curRDD.zipPartitions(buildPlan.executeColumnar()) { (iter, depIter) => -// ExecutorManager.tryTaskSet(numaBindingInfo) -// val ctx = curPlan.asInstanceOf[TransformSupport].dependentPlanCtx -// val expression = -// TreeBuilder.makeExpression( -// ctx.root, -// Field.nullable("result", new ArrowType.Int(32, true))) -// val hashRelationKernel = new ExpressionEvaluator() -// hashRelationKernel.build(ctx.inputSchema, Lists.newArrayList(expression), true) -// var build_elapse_internal: Long = 0 -// while (depIter.hasNext) { -// val dep_cb = depIter.next() -// if (dep_cb.numRows > 0) { -// (0 until dep_cb.numCols).toList.foreach(i => -// dep_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain()) -// buildRelationBatchHolder += dep_cb -// val beforeEval = System.nanoTime() -// val dep_rb = ConverterUtils.createArrowRecordBatch(dep_cb) -// hashRelationKernel.evaluate(dep_rb) -// ConverterUtils.releaseArrowRecordBatch(dep_rb) -// build_elapse += System.nanoTime() - beforeEval -// build_elapse_internal += System.nanoTime() - beforeEval -// } -// } -// buildTime += (build_elapse_internal / 1000000) -// dependentKernels += hashRelationKernel -// dependentKernelIterators += hashRelationKernel.finishByIterator() -// iter -// } -// case other => -// /* we should cache result from this operator */ -// curRDD.zipPartitions(other.executeColumnar()) { (iter, depIter) => -// ExecutorManager.tryTaskSet(numaBindingInfo) -// val curOutput = other match { -// case p: ColumnarSortMergeJoinExec => p.output_skip_alias -// case p: ColumnarBroadcastHashJoinExec => p.output_skip_alias -// case p: ColumnarShuffledHashJoinExec => p.output_skip_alias -// case p => p.output -// } -// val inputSchema = ConverterUtils.toArrowSchema(curOutput) -// val outputSchema = ConverterUtils.toArrowSchema(curOutput) -// if (!parentPlan.isInstanceOf[ColumnarSortMergeJoinExec]) { -// if (parentPlan == null) { -// throw new UnsupportedOperationException( -// s"Only support use ${other.getClass} as buildPlan in ColumnarSortMergeJoin," + -// s"while this parent Plan is null") -// } else { -// throw new UnsupportedOperationException( -// s"Only support use ${other.getClass} as buildPlan in ColumnarSortMergeJoin," + -// s"while this parent Plan is ${parentPlan.getClass}") -// } -// } -// val parent = parentPlan.asInstanceOf[ColumnarSortMergeJoinExec] -// val keyAttributes = if (other.equals(parent.buildPlan)) { -// parent.buildKeys.map(ConverterUtils.getAttrFromExpr(_)) -// } else { -// parent.streamedKeys.map(ConverterUtils.getAttrFromExpr(_)) -// } -// val cachedFunction = prepareRelationFunction(keyAttributes, curOutput) -// val expression = -// TreeBuilder.makeExpression( -// cachedFunction, -// Field.nullable("result", new ArrowType.Int(32, true))) -// val cachedRelationKernel = new ExpressionEvaluator() -// cachedRelationKernel.build( -// inputSchema, -// Lists.newArrayList(expression), -// outputSchema, -// true) -// -// if (enableColumnarSortMergeJoinLazyRead) { -// // Used as ABI to prevent from serializing buffer data -// val serializedItr = new ColumnarNativeIterator(depIter.asJava) -// cachedRelationKernel.evaluate(serializedItr) -// } else { -// while (depIter.hasNext) { -// val dep_cb = depIter.next() -// if (dep_cb.numRows > 0) { -// (0 until dep_cb.numCols).toList.foreach(i => -// dep_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain()) -// buildRelationBatchHolder += dep_cb -// val dep_rb = ConverterUtils.createArrowRecordBatch(dep_cb) -// cachedRelationKernel.evaluate(dep_rb) -// ConverterUtils.releaseArrowRecordBatch(dep_rb) -// } -// } -// } -// dependentKernels += cachedRelationKernel -// val beforeEval = System.nanoTime() -// dependentKernelIterators += cachedRelationKernel.finishByIterator() -// build_elapse += System.nanoTime() - beforeEval -// iter -// } -// } -// idx += 1 -// } // check if BatchScan exists - var current_op = child - while (current_op.isInstanceOf[TransformSupport] && - !current_op.isInstanceOf[BatchScanExecTransformer] && - current_op.asInstanceOf[TransformSupport].getChild != null) { - current_op = current_op.asInstanceOf[TransformSupport].getChild - } - val contains_batchscan = if (current_op != null && - current_op.isInstanceOf[BatchScanExecTransformer]) { - true - } else { - false - } - if (contains_batchscan) { + val current_op = checkBatchScanExecTransformerChild() + if (current_op.isDefined) { // If containing batchscan, a new RDD is created. // TODO: Remove ? val execTempDir = GazellePluginConfig.getTempFile @@ -474,7 +373,7 @@ case class WholeStageTransformerExec(child: SparkPlan)(val transformStageId: Int sparkConf) s"${execTempDir}/spark-columnar-plugin-codegen-precompile-${signature}.jar" }) - val batchScan = current_op.asInstanceOf[BatchScanExecTransformer] + val batchScan = current_op.get val wsCxt = doWholestageTransform() val startTime = System.nanoTime() diff --git a/jvm/src/test/scala/com/intel/oap/benchmarks/BenchmarkTest.scala b/jvm/src/test/scala/com/intel/oap/benchmarks/BenchmarkTest.scala index 3465ab5b0a27..8f30278c4c61 100644 --- a/jvm/src/test/scala/com/intel/oap/benchmarks/BenchmarkTest.scala +++ b/jvm/src/test/scala/com/intel/oap/benchmarks/BenchmarkTest.scala @@ -36,7 +36,7 @@ object BenchmarkTest { val resourcePath = rootPath + "../../../src/test/resources/" val dataPath = resourcePath + "/tpch-data/" val queryPath = resourcePath + "/queries/" - (new File(dataPath).getAbsolutePath, "parquet", 1, false, queryPath + "q06.sql", "") + (new File(dataPath).getAbsolutePath, "parquet", 10, false, queryPath + "q06.sql", "") } val sqlStr = Source.fromFile(new File(sqlFilePath), "UTF-8") @@ -68,7 +68,10 @@ object BenchmarkTest { .config("spark.sql.execution.arrow.maxRecordsPerBatch", "20000") .config("spark.oap.sql.columnar.columnartorow", "false") .config("spark.oap.sql.columnar.loadnative", "false") - .config("spark.sql.planChangeLog.level", "info") + .config("spark.oap.sql.columnar.libpath", + "/home/myubuntu/Works/c_cpp_projects/Kyligence-ClickHouse/cmake-build-release/utils/local-engine/liblocal_engine_jni.so") + .config("spark.oap.sql.columnar.iterator", "true") + //.config("spark.sql.planChangeLog.level", "info") .config("spark.sql.columnVector.offheap.enabled", "true") .config("spark.memory.offHeap.enabled", "true") .config("spark.memory.offHeap.size", "6442450944") @@ -105,6 +108,7 @@ object BenchmarkTest { val startTime = System.nanoTime() spark.sql(sql).show(200, false) val tookTime = (System.nanoTime() - startTime) / 1000000 + println(s"Execute ${i} time, time: ${tookTime}") tookTimeArr += tookTime }