Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -151,7 +152,10 @@ void initPropertiesAndConfigs() {
conf.setIntVar(ConfVars.HISTORY_QUERY_CACHE_SIZE, 10);

// Python function path
conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString());
URL pythonUdfURL = getClass().getResource("/pyudf");
if (pythonUdfURL != null) {
conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, pythonUdfURL.toString());
}

// Buffer size
conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,13 @@ public void testMultiBytesDelimiter4() throws Exception {
}
}

@Test
public void testSimpleQueryWithPythonFuncs() throws Exception {
ResultSet res = executeQuery();
assertResultSet(res);
cleanupQuery(res);
}

@Test
public void testSelectPythonFuncs() throws Exception {
ResultSet res = executeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from tajo_util import output_type
from types import StringType

@output_type('int4')
def return_one():
Expand All @@ -34,3 +35,9 @@ def add_py(a,b):
return a+b
else:
return None

def str_len(a):
if a is None:
return 0
else:
return len(a)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select str_len(r_name) from region;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
?str_len
-------------------------------
6
7
4
6
11
0
0
0
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static Optional<List<FunctionDesc>> loadUserDefinedFunctions(TajoConf con
try {
codePath = new Path(codePathStr);
} catch (IllegalArgumentException e) {
LOG.warn(e);
LOG.warn("Illegal function path", e);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ private void initSeqScanExec() throws IOException, TajoException {
new QueryContext(tajoConf), null,
new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
fragmentProtos, null);

if (scanNode.hasTargets()) {
QueryExecutor.startScriptExecutors(taskContext.getQueryContext(),
taskContext.getEvalContext(),
scanNode.getTargets());
}

this.scanExec = new PartitionMergeScanExec(taskContext, scanNode, fragmentProtos);
this.scanExec.init();
} else {
Expand Down Expand Up @@ -155,6 +162,10 @@ public void close() throws IOException {
rowBlock = null;
}

if (taskContext != null) {
QueryExecutor.stopScriptExecutors(taskContext.getEvalContext());
}

//remove temporal final output
if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, queryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.tajo.type.Type;
import org.apache.tajo.util.TUtil;

import java.util.Arrays;

import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_AGGREGATION;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_UDA;

Expand Down Expand Up @@ -149,7 +151,7 @@ public boolean equals(Object obj) {

@Override
public int hashCode() {
return Objects.hashCode(funcDesc, argEvals);
return Objects.hashCode(funcDesc, Arrays.hashCode(argEvals));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static Set<FunctionDesc> registerFunctions(URI path, String namespace) th
}
} catch (Throwable t) {
// ignore invalid functions
LOG.warn(t);
LOG.warn("Cannot parse function " + funcInfo, t);
}
}
return functionDescs;
Expand Down Expand Up @@ -154,6 +154,11 @@ private static class AggFuncInfo implements FunctionInfo {
ScalarFuncInfo mergeInfo;
ScalarFuncInfo getPartialResultInfo;
ScalarFuncInfo getFinalResultInfo;

@Override
public String toString() {
return className + "." + funcName;
}
}

private static class ScalarFuncInfo implements FunctionInfo {
Expand All @@ -171,6 +176,11 @@ public ScalarFuncInfo(String[] quotedSchemaStrings, String funcName, int paramNu
this.funcName = funcName;
this.paramNum = paramNum;
}

@Override
public String toString() {
return funcName;
}
}

// TODO: python parser must be improved.
Expand Down