Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private BeamJavaUdfCalcRule() {

@Override
public boolean matches(RelOptRuleCall x) {
return ZetaSQLQueryPlanner.hasUdfInProjects(x);
return ZetaSQLQueryPlanner.hasOnlyJavaUdfInProjects(x);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private BeamZetaSqlCalcRule() {

@Override
public boolean matches(RelOptRuleCall x) {
return !ZetaSQLQueryPlanner.hasUdfInProjects(x);
return ZetaSQLQueryPlanner.hasNoJavaUdfInProjects(x);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,44 @@ public static Collection<RuleSet> getZetaSqlRuleSets() {
return modifyRuleSetsForZetaSql(BeamRuleSets.getRuleSets());
}

/** Returns true if the argument contains any user-defined Java functions. */
static boolean hasUdfInProjects(RelOptRuleCall x) {
/**
* Returns true if the arguments only contain user-defined Java functions, otherwise returns
* false. User-defined java functions are in the category whose function group is equal to {@code
* SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS}
*/
static boolean hasOnlyJavaUdfInProjects(RelOptRuleCall x) {
List<RelNode> resList = x.getRelList();
for (RelNode relNode : resList) {
if (relNode instanceof LogicalCalc) {
LogicalCalc logicalCalc = (LogicalCalc) relNode;
for (RexNode rexNode : logicalCalc.getProgram().getExprList()) {
if (rexNode instanceof RexCall) {
RexCall call = (RexCall) rexNode;
if (call.getOperator() instanceof SqlUserDefinedFunction) {
SqlUserDefinedFunction udf = (SqlUserDefinedFunction) call.op;
if (udf.function instanceof ZetaSqlScalarFunctionImpl) {
ZetaSqlScalarFunctionImpl scalarFunction = (ZetaSqlScalarFunctionImpl) udf.function;
if (!scalarFunction.functionGroup.equals(
SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS)) {
return false;
}
} else {
return false;
}
} else {
return false;
}
}
}
}
}
return true;
}

/**
* Returns false if the argument contains any user-defined Java functions, otherwise returns true.
*/
static boolean hasNoJavaUdfInProjects(RelOptRuleCall x) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: positive phrasing avoids double negatives and makes the logic easier to follow, so consider inverting this (and renaming to something like hasJavaUdfInProjects or hasAnyJavaUdfInProjects)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed this might be better. But I think it's bit tricky to implement the positive version of hasNoJavaUdfInProjects . The hasNoJavaUdfInProjects is more error prone at this moment (we only need to consider java udf).

List<RelNode> resList = x.getRelList();
for (RelNode relNode : resList) {
if (relNode instanceof LogicalCalc) {
Expand All @@ -129,15 +165,17 @@ static boolean hasUdfInProjects(RelOptRuleCall x) {
SqlUserDefinedFunction udf = (SqlUserDefinedFunction) call.op;
if (udf.function instanceof ZetaSqlScalarFunctionImpl) {
ZetaSqlScalarFunctionImpl scalarFunction = (ZetaSqlScalarFunctionImpl) udf.function;
return scalarFunction.functionGroup.equals(
SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS);
if (scalarFunction.functionGroup.equals(
SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS)) {
return false;
}
}
}
}
}
}
}
return false;
return true;
}

private static Collection<RuleSet> modifyRuleSetsForZetaSql(Collection<RuleSet> ruleSets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner.CannotPlanException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.codehaus.commons.compiler.CompileException;
import org.joda.time.Duration;
Expand Down Expand Up @@ -203,15 +204,10 @@ public void testBinaryJavaUdf() {
+ "SELECT matches(\"a\", \"a\"), 'apple'='beta'",
jarPath);
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);

Schema singleField =
Schema.builder().addBooleanField("field1").addBooleanField("field2").build();

PAssert.that(stream)
.containsInAnyOrder(Row.withSchema(singleField).addValues(true, false).build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
thrown.expect(CannotPlanException.class);
thrown.expectMessage(
"There are not enough rules to produce a node with desired properties: convention=BEAM_LOGICAL.");
zetaSQLQueryPlanner.convertToBeamRel(sql);
}

// TODO(BEAM-11747) Add tests that mix UDFs and builtin functions that rely on the ZetaSQL C++
Expand Down