Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions jvm/src/main/java/com/intel/oap/row/JniInstance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.row;

import io.kyligence.jni.engine.LocalEngine;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Java API for in-process profiling. Serves as a wrapper around
* async-profiler native library. This class is a singleton.
* The first call to {@link #getInstance()} initiates loading of
* libasyncProfiler.so.
*/
public class JniInstance {

private static JniInstance instance;

private String currLibPath = "";

private JniInstance() {
}

public static JniInstance getInstance() {
return getInstance(null);
}

public static synchronized JniInstance getInstance(String libPath) {
if (instance != null) {
return instance;
}

File file = null;
boolean libPathExists = false;
if (StringUtils.isNotBlank(libPath)) {
file = new File(libPath);
libPathExists = file.isFile() && file.exists();
}
if (!libPathExists) {
String soFileName = "/liblocal_engine_jnid.so";
try {
InputStream is = JniInstance.class.getResourceAsStream(soFileName);
file = File.createTempFile("lib", ".so");
OutputStream os = new FileOutputStream(file);
byte[] buffer = new byte[128 << 10];
int length;
while ((length = is.read(buffer)) != -1) {
os.write(buffer, 0, length);
}
is.close();
os.close();
} catch (IOException e) {
}
}
if (file != null) {
try {
file.setReadable(true, false);
System.load(file.getAbsolutePath());
libPath = file.getAbsolutePath();
} catch (UnsatisfiedLinkError error) {
throw error;
}
}
instance = new JniInstance();
instance.setCurrLibPath(libPath);
LocalEngine.initEngineEnv();
return instance;
}

public void setCurrLibPath(String currLibPath) {
this.currLibPath = currLibPath;
}

public LocalEngine buildLocalEngine(byte[] substraitPlan) {
LocalEngine localEngine = new LocalEngine(substraitPlan);
return localEngine;
}
}
58 changes: 58 additions & 0 deletions jvm/src/main/java/com/intel/oap/row/RowIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.row;

import io.kyligence.jni.engine.LocalEngine;

import java.io.IOException;

public class RowIterator {

private LocalEngine localEngine;
private boolean closed = false;

public RowIterator() throws IOException {}

public RowIterator(byte[] plan, String soFilePath) throws IOException {
this.localEngine = JniInstance.getInstance(soFilePath).buildLocalEngine(plan);
this.localEngine.execute();
}

public boolean hasNext() throws IOException {
return this.localEngine.hasNext();
}

public SparkRowInfo next() throws IOException {
if (this.localEngine == null) {
return null;
}
return this.localEngine.next();
}

public void close() {
if (!closed) {
if (this.localEngine != null) {
try {
this.localEngine.close();
} catch (IOException e) {
}
}
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
* limitations under the License.
*/

package com.intel.oap.vectorized;
package com.intel.oap.row;

/** POJO to hold the output file path of the designated partition id */
public class PartitionFileInfo {
private final int partitionId;
private final String filePath;
public class SparkRowInfo {
public long[] offsets;
public long[] lengths;
public long memoryAddress;
public long fieldsNum;

public PartitionFileInfo(int partitionId, String filePath) {
this.partitionId = partitionId;
this.filePath = filePath;
}

public int getPartitionId() {
return partitionId;
}

public String getFilePath() {
return filePath;
}
public SparkRowInfo(long[] offsets, long[] lengths, long memoryAddress, long fieldsNum) {
this.offsets = offsets;
this.lengths = lengths;
this.memoryAddress = memoryAddress;
this.fieldsNum = fieldsNum;
}
}
46 changes: 46 additions & 0 deletions jvm/src/main/java/io/kyligence/jni/engine/LocalEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.kyligence.jni.engine;

import java.io.Closeable;
import java.io.IOException;

import com.intel.oap.row.SparkRowInfo;

public class LocalEngine implements Closeable {
public static native long test(int a, int b);

public static native void initEngineEnv();

private long nativeExecutor;
private byte[] plan;

public LocalEngine(byte[] plan) {
this.plan = plan;
}

public native void execute();

public native boolean hasNext();

public native SparkRowInfo next();


@Override
public native void close() throws IOException;
}
9 changes: 8 additions & 1 deletion jvm/src/main/scala/com/intel/oap/GazellePluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package com.intel.oap

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -128,6 +127,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val enablePreferColumnar: Boolean =
conf.getConfString("spark.oap.sql.columnar.preferColumnar", "true").toBoolean

// This config is used for specifying whether to use columnar basic iterator.
val enableColumnarIterator: Boolean =
conf.getConfString("spark.oap.sql.columnar.iterator", "true").toBoolean

// This config is used for testing. Setting to false will disable loading native libraries.
val loadNative: Boolean =
conf.getConfString("spark.oap.sql.columnar.loadnative", "true").toBoolean
Expand All @@ -136,6 +139,10 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val nativeLibName: String =
conf.getConfString("spark.oap.sql.columnar.libname", "spark_columnar_jni")

// This config is used for specifying the absolute path of the native library.
val nativeLibPath: String =
conf.getConfString("spark.oap.sql.columnar.libpath", "")

// fallback to row operators if there are several continous joins
val joinOptimizationThrottle: Integer =
conf.getConfString("spark.oap.sql.columnar.joinOptimizationLevel", "12").toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import com.intel.oap.expression._
import com.intel.oap.substrait.expression.ExpressionNode
import com.intel.oap.substrait.rel.{RelBuilder, RelNode}
import com.intel.oap.substrait.SubstraitContext

import com.intel.oap.GazellePluginConfig
import org.apache.spark.SparkConf

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -43,7 +44,7 @@ case class ConditionProjectExecTransformer(

val sparkConf: SparkConf = sparkContext.getConf

override def supportsColumnar: Boolean = true
override def supportsColumnar: Boolean = GazellePluginConfig.getConf.enableColumnarIterator

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package com.intel.oap.execution

import com.intel.oap.GazellePluginConfig
import com.intel.oap.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import com.intel.oap.substrait.rel.{LocalFilesBuilder, RelBuilder}
import com.intel.oap.substrait.rel.RelBuilder
import com.intel.oap.substrait.SubstraitContext
import com.intel.oap.substrait.`type`.TypeBuiler
import com.intel.oap.substrait.expression.{ExpressionBuilder, ExpressionNode}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -34,14 +32,16 @@ import org.apache.spark.sql.vectorized.ColumnarBatch

class BatchScanExecTransformer(output: Seq[AttributeReference], @transient scan: Scan)
extends BatchScanExec(output, scan) with TransformSupport {
val tmpDir: String = GazellePluginConfig.getConf.tmpFile

val filterExprs: Seq[Expression] = if (scan.isInstanceOf[FileScan]) {
scan.asInstanceOf[FileScan].dataFilters
} else {
throw new UnsupportedOperationException(s"${scan.getClass.toString} is not supported")
}

override def supportsColumnar(): Boolean = true
override def supportsColumnar(): Boolean =
super.supportsColumnar && GazellePluginConfig.getConf.enableColumnarIterator

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "input_batches"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.intel.oap.expression._
import com.intel.oap.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode}
import com.intel.oap.substrait.rel.{RelBuilder, RelNode}
import com.intel.oap.substrait.SubstraitContext
import com.intel.oap.GazellePluginConfig
import java.util

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -52,7 +53,7 @@ case class HashAggregateExecTransformer(

val sparkConf = sparkContext.getConf

override def supportsColumnar: Boolean = true
override def supportsColumnar: Boolean = GazellePluginConfig.getConf.enableColumnarIterator

val resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute)

Expand Down Expand Up @@ -89,7 +90,7 @@ case class HashAggregateExecTransformer(

override def doValidate(): Boolean = {
var isPartial = true
aggregateExpressions.toList.foreach(aggExpr => {
aggregateExpressions.foreach(aggExpr => {
aggExpr.mode match {
case Partial =>
case _ => isPartial = false
Expand Down Expand Up @@ -175,7 +176,7 @@ case class HashAggregateExecTransformer(

// Get the aggregate function nodes
val aggregateFunctionList = new util.ArrayList[AggregateFunctionNode]()
groupingExpressions.toList.foreach(expr => {
groupingExpressions.foreach(expr => {
val groupingExpr: Expression = ExpressionConverter
.replaceWithExpressionTransformer(expr, originalInputAttributes)
val exprNode = groupingExpr.asInstanceOf[ExpressionTransformer].doTransform(args)
Expand All @@ -185,7 +186,7 @@ case class HashAggregateExecTransformer(
Lists.newArrayList(exprNode), outputTypeNode)
aggregateFunctionList.add(aggFunctionNode)
})
aggregateExpressions.toList.foreach(aggExpr => {
aggregateExpressions.foreach(aggExpr => {
val aggregatFunc = aggExpr.aggregateFunction
val functionId = AggregateFunctionsBuilder.create(args, aggregatFunc)
val mode = modeToKeyWord(aggExpr.mode)
Expand Down
Loading