diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index 2126ceac9d..a7559eca77 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -59,6 +59,7 @@ import java.io.Writer; import java.net.InetSocketAddress; import java.net.URI; +import java.net.URL; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; @@ -151,7 +152,10 @@ void initPropertiesAndConfigs() { conf.setIntVar(ConfVars.HISTORY_QUERY_CACHE_SIZE, 10); // Python function path - conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString()); + URL pythonUdfURL = getClass().getResource("/pyudf"); + if (pythonUdfURL != null) { + conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, pythonUdfURL.toString()); + } // Buffer size conf.setInt(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE.varname, 1); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index d3b646abb2..2e1ac85c78 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -725,6 +725,13 @@ public void testMultiBytesDelimiter4() throws Exception { } } + @Test + public void testSimpleQueryWithPythonFuncs() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + @Test public void testSelectPythonFuncs() throws Exception { ResultSet res = executeQuery(); diff --git a/tajo-core-tests/src/test/resources/python/__init__.py b/tajo-core-tests/src/test/resources/pyudf/__init__.py similarity index 100% rename from tajo-core-tests/src/test/resources/python/__init__.py rename to tajo-core-tests/src/test/resources/pyudf/__init__.py diff --git a/tajo-core-tests/src/test/resources/python/test_funcs.py b/tajo-core-tests/src/test/resources/pyudf/test_funcs.py similarity index 91% rename from tajo-core-tests/src/test/resources/python/test_funcs.py rename to tajo-core-tests/src/test/resources/pyudf/test_funcs.py index df3371cc17..404ceb6e65 100644 --- a/tajo-core-tests/src/test/resources/python/test_funcs.py +++ b/tajo-core-tests/src/test/resources/pyudf/test_funcs.py @@ -15,6 +15,7 @@ # limitations under the License. from tajo_util import output_type +from types import StringType @output_type('int4') def return_one(): @@ -34,3 +35,9 @@ def add_py(a,b): return a+b else: return None + +def str_len(a): + if a is None: + return 0 + else: + return len(a) \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/python/test_funcs2.py b/tajo-core-tests/src/test/resources/pyudf/test_funcs2.py similarity index 100% rename from tajo-core-tests/src/test/resources/python/test_funcs2.py rename to tajo-core-tests/src/test/resources/pyudf/test_funcs2.py diff --git a/tajo-core-tests/src/test/resources/python/test_udaf.py b/tajo-core-tests/src/test/resources/pyudf/test_udaf.py similarity index 100% rename from tajo-core-tests/src/test/resources/python/test_udaf.py rename to tajo-core-tests/src/test/resources/pyudf/test_udaf.py diff --git a/tajo-core-tests/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithPythonFuncs.sql b/tajo-core-tests/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithPythonFuncs.sql new file mode 100644 index 0000000000..5e5910f16a --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithPythonFuncs.sql @@ -0,0 +1 @@ +select str_len(r_name) from region; \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/results/TestSelectQuery/testSimpleQueryWithPythonFuncs.result b/tajo-core-tests/src/test/resources/results/TestSelectQuery/testSimpleQueryWithPythonFuncs.result new file mode 100644 index 0000000000..38069597df --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSelectQuery/testSimpleQueryWithPythonFuncs.result @@ -0,0 +1,10 @@ +?str_len +------------------------------- +6 +7 +4 +6 +11 +0 +0 +0 \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java index c309c64470..a32d5a533b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/FunctionLoader.java @@ -101,7 +101,7 @@ public static Optional> loadUserDefinedFunctions(TajoConf con try { codePath = new Path(codePathStr); } catch (IllegalArgumentException e) { - LOG.warn(e); + LOG.warn("Illegal function path", e); continue; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index e8b8b450c0..70268cb03a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -115,6 +115,13 @@ private void initSeqScanExec() throws IOException, TajoException { new QueryContext(tajoConf), null, new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), fragmentProtos, null); + + if (scanNode.hasTargets()) { + QueryExecutor.startScriptExecutors(taskContext.getQueryContext(), + taskContext.getEvalContext(), + scanNode.getTargets()); + } + this.scanExec = new PartitionMergeScanExec(taskContext, scanNode, fragmentProtos); this.scanExec.init(); } else { @@ -155,6 +162,10 @@ public void close() throws IOException { rowBlock = null; } + if (taskContext != null) { + QueryExecutor.stopScriptExecutors(taskContext.getEvalContext()); + } + //remove temporal final output if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, queryId); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java index 1a4ea8e3d2..575d1585ce 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/FunctionEval.java @@ -32,6 +32,8 @@ import org.apache.tajo.type.Type; import org.apache.tajo.util.TUtil; +import java.util.Arrays; + import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_AGGREGATION; import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_UDA; @@ -149,7 +151,7 @@ public boolean equals(Object obj) { @Override public int hashCode() { - return Objects.hashCode(funcDesc, argEvals); + return Objects.hashCode(funcDesc, Arrays.hashCode(argEvals)); } @Override diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 9db792cfee..e5b5f47531 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -109,7 +109,7 @@ public static Set registerFunctions(URI path, String namespace) th } } catch (Throwable t) { // ignore invalid functions - LOG.warn(t); + LOG.warn("Cannot parse function " + funcInfo, t); } } return functionDescs; @@ -154,6 +154,11 @@ private static class AggFuncInfo implements FunctionInfo { ScalarFuncInfo mergeInfo; ScalarFuncInfo getPartialResultInfo; ScalarFuncInfo getFinalResultInfo; + + @Override + public String toString() { + return className + "." + funcName; + } } private static class ScalarFuncInfo implements FunctionInfo { @@ -171,6 +176,11 @@ public ScalarFuncInfo(String[] quotedSchemaStrings, String funcName, int paramNu this.funcName = funcName; this.paramNum = paramNum; } + + @Override + public String toString() { + return funcName; + } } // TODO: python parser must be improved.