Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.zetasql.LanguageOptions;
import com.google.zetasql.Value;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexSlot;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlOperator;
Expand Down Expand Up @@ -141,11 +143,14 @@ public static Collection<RuleSet> getZetaSqlRuleSets(Collection<RelOptRule> calc
* group is equal to {@code SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS}
*/
static boolean hasOnlyJavaUdfInProjects(RelOptRuleCall x) {
HashSet<Integer> udfs = new HashSet<>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be scoped inside for (RelNode relNode : resList).

Does anyone know if it's even possible for x to contain multiple rels?

List<RelNode> resList = x.getRelList();
for (RelNode relNode : resList) {
if (relNode instanceof LogicalCalc) {
LogicalCalc logicalCalc = (LogicalCalc) relNode;
for (RexNode rexNode : logicalCalc.getProgram().getExprList()) {
List<RexNode> exprList = logicalCalc.getProgram().getExprList();
for (int i = 0; i < exprList.size(); i++) {
RexNode rexNode = exprList.get(i);
if (rexNode instanceof RexCall) {
RexCall call = (RexCall) rexNode;
final SqlOperator operator = call.getOperator();
Expand All @@ -160,8 +165,10 @@ static boolean hasOnlyJavaUdfInProjects(RelOptRuleCall x) {
SqlUserDefinedFunction udf = (SqlUserDefinedFunction) call.op;
if (udf.function instanceof ZetaSqlScalarFunctionImpl) {
ZetaSqlScalarFunctionImpl scalarFunction = (ZetaSqlScalarFunctionImpl) udf.function;
if (!scalarFunction.functionGroup.equals(
if (scalarFunction.functionGroup.equals(
SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS)) {
udfs.add(i);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that none of the RexNode in call.getOperands() contains any subexpression?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a case of nested call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. Increment(1 + 1) where Increment is a UDF?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly what I was thinking. If that is true, it is necessary to recursively call on each operand.

Copy link
Member

@kennknowles kennknowles Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it would be a normal approach to low-level code (like the Program object) to use SSA / A-normal form so that there is no nested expression. I just don't know what Calcite guarantees.

That would cause the project to be turned into:

x = 1 + 1
y = increment(x)

Then recursion is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is such form, Calc splitting will become easy to implement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only RexCall has getOperands. I was under the impression that these operands are garenteed to be prior entries returned in ExprList. It will be a little while before I have time to verify. If that is not the case, it is easy to transform the ExprList into that form. That will need to be true for this to be correct and for calc splitting to be easy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that is how I expected calc splitting to be implemented as well. I thought that the program was constrained to have that form as well.

Didn't you demonstrate an example like increment(1 + 1) where the + was executed by BeamCalcRel? This is what prompted me to read the code to see if the operands were checked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case you describe was solved by #13912.

Programs should be normalized before we get here, so RexCall is guaranteed to only reference previous arguments:
https://github.com/apache/calcite/blob/12a484a5c364c36e9551e59f4dc33bfb219ecf07/core/src/main/java/org/apache/calcite/rex/RexProgramBuilder.java#L507

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Reject ZetaSQL Builtin Scalar Functions
return false;
}
Expand Down Expand Up @@ -205,9 +212,21 @@ static boolean hasOnlyJavaUdfInProjects(RelOptRuleCall x) {
return false;
}
}
for (RexSlot slot : logicalCalc.getProgram().getProjectList()) {
if (!udfs.contains(slot.getIndex())) {
// Reject non-udf project
return false;
}
}
if (logicalCalc.getProgram().getCondition() != null) {
if (!udfs.contains(logicalCalc.getProgram().getCondition().getIndex())) {
// Reject non-udf condition
return false;
}
}
}
}
return true;
return !udfs.isEmpty();
}

/**
Expand Down