diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 04fe05518b72..be1b27b3e2f9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -17,9 +17,6 @@ */ package org.apache.beam.dsls.sql; -import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; -import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -47,17 +44,15 @@ Pipeline p = Pipeline.create(options); //create table from TextIO; -TableSchema tableASchema = ...; PCollection inputTableA = p.apply(TextIO.read().from("/my/input/patha")) - .apply(BeamSql.fromTextRow(tableASchema)); -TableSchema tableBSchema = ...; + .apply(...); PCollection inputTableB = p.apply(TextIO.read().from("/my/input/pathb")) - .apply(BeamSql.fromTextRow(tableBSchema)); + .apply(...); //run a simple query, and register the output as a table in BeamSql; String sql1 = "select MY_FUNC(c1), c2 from TABLE_A"; -PCollection outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)) - .withUdf("MY_FUNC", myFunc); +PCollection outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1) + .withUdf("MY_FUNC", myFunc)); //run a JOIN with one table from TextIO, and one table from another query PCollection outputTableB = PCollectionTuple.of( @@ -107,35 +102,47 @@ public static PTransform> query(String */ private static class QueryTransform extends PTransform> { + private BeamSqlEnv sqlEnv; private String sqlQuery; + public QueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; + sqlEnv = new BeamSqlEnv(); + } + + public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) { + this.sqlQuery = sqlQuery; + this.sqlEnv = sqlEnv; } @Override public PCollection expand(PCollectionTuple input) { - //register tables - for (TupleTag sourceTag : input.getAll().keySet()) { - PCollection sourceStream = (PCollection) input.get(sourceTag); - BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); - - registerTable(sourceTag.getId(), - new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); - } + registerTables(input); BeamRelNode beamRelNode = null; try { - beamRelNode = planner.convertToBeamRel(sqlQuery); + beamRelNode = sqlEnv.planner.convertToBeamRel(sqlQuery); } catch (ValidationException | RelConversionException | SqlParseException e) { throw new IllegalStateException(e); } try { - return beamRelNode.buildBeamPipeline(input); + return beamRelNode.buildBeamPipeline(input, sqlEnv); } catch (Exception e) { throw new IllegalStateException(e); } } + + //register tables, related with input PCollections. + private void registerTables(PCollectionTuple input){ + for (TupleTag sourceTag : input.getAll().keySet()) { + PCollection sourceStream = (PCollection) input.get(sourceTag); + BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); + + sqlEnv.registerTable(sourceTag.getId(), + new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); + } + } } /** @@ -144,21 +151,23 @@ public PCollection expand(PCollectionTuple input) { */ private static class SimpleQueryTransform extends PTransform, PCollection> { + BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; + public SimpleQueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; } - public SimpleQueryTransform withUdf(String udfName){ - throw new UnsupportedOperationException("Pending for UDF support"); - } +// public SimpleQueryTransform withUdf(String udfName){ +// throw new UnsupportedOperationException("Pending for UDF support"); +// } @Override public PCollection expand(PCollection input) { SqlNode sqlNode; try { - sqlNode = planner.parseQuery(sqlQuery); - planner.getPlanner().close(); + sqlNode = sqlEnv.planner.parseQuery(sqlQuery); + sqlEnv.planner.getPlanner().close(); } catch (SqlParseException e) { throw new IllegalStateException(e); } @@ -167,7 +176,7 @@ public PCollection expand(PCollection input) { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); return PCollectionTuple.of(new TupleTag(tableName), input) - .apply(BeamSql.query(sqlQuery)); + .apply(new QueryTransform(sqlQuery, sqlEnv)); } else { throw new UnsupportedOperationException( "Sql operation: " + sqlNode.toString() + " is not supported!"); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java index dbf9a5978105..50da244ec1a2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java @@ -17,8 +17,6 @@ */ package org.apache.beam.dsls.sql; -import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; @@ -33,12 +31,11 @@ */ @Experimental public class BeamSqlCli { - /** * Returns a human readable representation of the query execution plan. */ - public static String explainQuery(String sqlString) throws Exception { - BeamRelNode exeTree = planner.convertToBeamRel(sqlString); + public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { + BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString); String beamPlan = RelOptUtil.toString(exeTree); return beamPlan; } @@ -46,22 +43,23 @@ public static String explainQuery(String sqlString) throws Exception { /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection compilePipeline(String sqlStatement) throws Exception{ + public static PCollection compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) + throws Exception{ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() .as(PipelineOptions.class); // FlinkPipelineOptions.class options.setJobName("BeamPlanCreator"); Pipeline pipeline = Pipeline.create(options); - return compilePipeline(sqlStatement, pipeline); + return compilePipeline(sqlStatement, pipeline, sqlEnv); } /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection compilePipeline(String sqlStatement, Pipeline basePipeline) - throws Exception{ + public static PCollection compilePipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception{ PCollection resultStream = - planner.compileBeamPipeline(sqlStatement, basePipeline); + sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); return resultStream; } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java index d7715c74c82f..baa2617d9fee 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -42,10 +42,10 @@ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. */ public class BeamSqlEnv { - static SchemaPlus schema; - static BeamQueryPlanner planner; + SchemaPlus schema; + BeamQueryPlanner planner; - static { + public BeamSqlEnv() { schema = Frameworks.createRootSchema(true); planner = new BeamQueryPlanner(schema); } @@ -53,7 +53,7 @@ public class BeamSqlEnv { /** * Register a UDF function which can be used in SQL expression. */ - public static void registerUdf(String functionName, Class clazz, String methodName) { + public void registerUdf(String functionName, Class clazz, String methodName) { schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName)); } @@ -61,7 +61,7 @@ public static void registerUdf(String functionName, Class clazz, String metho * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. * */ - public static void registerTable(String tableName, BaseBeamTable table) { + public void registerTable(String tableName, BaseBeamTable table) { schema.add(tableName, new BeamCalciteTable(table.getRecordType())); planner.getSourceTables().put(tableName, table); } @@ -69,7 +69,7 @@ public static void registerTable(String tableName, BaseBeamTable table) { /** * Find {@link BaseBeamTable} by table name. */ - public static BaseBeamTable findTable(String tableName){ + public BaseBeamTable findTable(String tableName){ return planner.getSourceTables().get(tableName); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 31f8302ab188..5f09fdd2737d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -63,13 +63,13 @@ public static void main(String[] args) throws Exception { //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection outputStream = inputTable.apply( - BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1")); + BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1")); //log out the output record; outputStream.apply("log_result", MapElements.via(new SimpleFunction() { public Void apply(BeamSqlRow input) { - System.out.println("TABLE_A: " + input); + System.out.println("PCOLLECTION: " + input); return null; } })); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java index 11b867aed2ee..f79bcf633131 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java @@ -57,7 +57,6 @@ public abstract BeamSqlPrimitive calculate(BeamSqlPrimitive le /** * The method to check whether operands are numeric or not. - * @param opType */ public boolean isOperandNumeric(SqlTypeName opType) { return SqlTypeName.NUMERIC_TYPES.contains(opType); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 2eaf9e783baf..6ae8a1eb278c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; @@ -106,12 +107,12 @@ public SqlNode parseQuery(String sqlQuery) throws SqlParseException{ * which is linked with the given {@code pipeline}. The final output stream is returned as * {@code PCollection} so more operations can be applied. */ - public PCollection compileBeamPipeline(String sqlStatement, Pipeline basePipeline) - throws Exception { + public PCollection compileBeamPipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception { BeamRelNode relNode = convertToBeamRel(sqlStatement); // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. - return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline)); + return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv); } /** diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index bcdc44f32093..7a1d003a239d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; @@ -72,13 +73,13 @@ public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); if (windowFieldIdx != -1) { upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index 40fe05c5b8e4..07b5c7cf26c2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.rel; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -49,14 +50,13 @@ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index 88fff637de0b..58539f871677 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -56,18 +56,17 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { * which is the persisted PCollection. */ @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName); + BaseBeamTable targetTable = sqlEnv.findTable(sourceName); PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter()); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index ed2bf1220905..a664ce1bef28 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -40,9 +40,8 @@ public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); String stageName = BeamSqlRelUtils.getStageName(this); @@ -55,7 +54,7 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti return sourceStream; } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). - BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName); + BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); return sourceTable.buildIOReader(inputPCollections.getPipeline()); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java index 01e1c336b76a..7cab171eacec 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -51,8 +51,8 @@ public BeamIntersectRel( return new BeamIntersectRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java index bee6c11ac475..b558f4ba8909 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -49,8 +49,8 @@ public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List inp return new BeamMinusRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index e6331c6fc7f8..2cdfc720af26 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -18,6 +18,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -59,13 +60,13 @@ public Project copy(RelTraitSet traitSet, RelNode input, List projects, } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index aed4b06b0773..d4c98a382843 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -17,14 +17,14 @@ */ package org.apache.beam.dsls.sql.rel; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.rel.RelNode; /** - * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's - * called by {@code BeamQueryPlanner}. + * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added. */ public interface BeamRelNode extends RelNode { @@ -33,5 +33,6 @@ public interface BeamRelNode extends RelNode { * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) * algorithm. */ - PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception; + PCollection buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) + throws Exception; } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java index 3d41e3ad4417..939c9c86f840 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -20,7 +20,7 @@ import java.io.Serializable; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; @@ -62,12 +62,12 @@ public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, this.all = all; } - public PCollection buildBeamPipeline( - PCollectionTuple inputPCollections) throws Exception { + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { PCollection leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); PCollection rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index 6c7be0b3d65b..75f9717c08b7 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.utils.CalciteUtils; @@ -119,11 +120,11 @@ public BeamSortRel( } } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); PCollection upstream = BeamSqlRelUtils.getBeamRelInput(input) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); Type windowType = upstream.getWindowingStrategy().getWindowFn() .getWindowTypeDescriptor().getType(); if (!windowType.equals(GlobalWindow.class)) { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java index 63cf11afa127..c661585c692c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; @@ -81,8 +81,8 @@ public BeamUnionRel(RelInput input) { return new BeamUnionRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index ce7576817984..030d2c85160b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; @@ -57,8 +57,8 @@ public BeamValuesRel( } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { List rows = new ArrayList<>(tuples.size()); String stageName = BeamSqlRelUtils.getStageName(this); if (tuples.isEmpty()) { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java index 69ca44ba31ce..ac395d318621 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java @@ -74,7 +74,6 @@ public static Integer toJavaType(SqlTypeName typeName) { /** * Get the {@code SqlTypeName} for the specified column of a table. - * @return */ public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) { return toCalciteType(schema.getFieldsType().get(index)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java index 02223c2acede..47fdc16582e3 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -34,6 +34,8 @@ * Test for {@code BeamIntersectRel}. */ public class BeamIntersectRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable @@ -57,8 +59,8 @@ public class BeamIntersectRelTest { @BeforeClass public static void setUp() { - BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); } @Test @@ -70,7 +72,7 @@ public void testIntersect() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -93,7 +95,7 @@ public void testIntersectAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java index cd6ba163f3d1..688ff8e05234 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -34,6 +34,8 @@ * Test for {@code BeamMinusRel}. */ public class BeamMinusRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable @@ -58,8 +60,8 @@ public class BeamMinusRelTest { @Before public void setUp() { - BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); MockedBeamSqlTable.CONTENT.clear(); } @@ -72,7 +74,7 @@ public void testExcept() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -93,7 +95,7 @@ public void testExceptAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java index 4936062bf0c0..f10a767f6654 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java @@ -42,6 +42,8 @@ * Test for {@code BeamSetOperatorRelBase}. */ public class BeamSetOperatorRelBaseTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); public static final Date THE_DATE = new Date(); @@ -57,7 +59,7 @@ public class BeamSetOperatorRelBaseTest { @BeforeClass public static void prepare() { THE_DATE.setTime(100000); - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); } @Test @@ -71,7 +73,7 @@ public void testSameWindow() throws Exception { + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); List expRows = MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -100,7 +102,7 @@ public void testDifferentWindows() throws Exception { // use a real pipeline rather than the TestPipeline because we are // testing exceptions, the pipeline will not actually run. Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); - BeamSqlCli.compilePipeline(sql, pipeline1); + BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); pipeline.run(); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index cfdbd5370197..251998499182 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -36,6 +36,8 @@ * Test for {@code BeamSortRel}. */ public class BeamSortRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); @@ -69,7 +71,7 @@ public void testOrderBy_basic() throws Exception { + "ORDER BY order_id asc, site_id desc limit 4"; System.out.println(sql); - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -86,7 +88,7 @@ public void testOrderBy_basic() throws Exception { @Test public void testOrderBy_nullsFirst() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable + sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -96,7 +98,7 @@ public void testOrderBy_nullsFirst() throws Exception { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -106,7 +108,7 @@ public void testOrderBy_nullsFirst() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -124,7 +126,7 @@ public void testOrderBy_nullsFirst() throws Exception { @Test public void testOrderBy_nullsLast() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable + sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -134,7 +136,7 @@ public void testOrderBy_nullsLast() throws Exception { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -144,7 +146,7 @@ public void testOrderBy_nullsLast() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -167,7 +169,7 @@ public void testOrderBy_with_offset() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -190,7 +192,7 @@ public void testOrderBy_bigFetch() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -221,13 +223,13 @@ public void testOrderBy_exception() throws Exception { + "ORDER BY order_id asc limit 11"; TestPipeline pipeline = TestPipeline.create(); - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); } @Before public void prepare() { - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); + sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); MockedBeamSqlTable.CONTENT.clear(); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java index c2a05973a59e..c5aa13224a6c 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java @@ -34,6 +34,8 @@ * Test for {@code BeamUnionRel}. */ public class BeamUnionRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable @@ -46,7 +48,7 @@ public class BeamUnionRelTest { @BeforeClass public static void prepare() { - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); } @Test @@ -58,7 +60,7 @@ public void testUnion() throws Exception { + " order_id, site_id, price " + "FROM ORDER_DETAILS "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -81,7 +83,7 @@ public void testUnionAll() throws Exception { + " SELECT order_id, site_id, price " + "FROM ORDER_DETAILS"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java index 4557e8e3abe4..9a5070a0ca52 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -35,6 +35,8 @@ * Test for {@code BeamValuesRel}. */ public class BeamValuesRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable stringTable = MockedBeamSqlTable @@ -49,7 +51,7 @@ public class BeamValuesRelTest { public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.VARCHAR, "name", SqlTypeName.VARCHAR, "description", @@ -61,7 +63,7 @@ public void testValues() throws Exception { @Test public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "c0", SqlTypeName.INTEGER, "c1", @@ -73,7 +75,7 @@ public void testValues_castInt() throws Exception { @Test public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "EXPR$0", SqlTypeName.CHAR, "EXPR$1", @@ -84,8 +86,8 @@ public void testValues_onlySelect() throws Exception { @BeforeClass public static void prepareClass() { - BeamSqlEnv.registerTable("string_table", stringTable); - BeamSqlEnv.registerTable("int_table", intTable); + sqlEnv.registerTable("string_table", stringTable); + sqlEnv.registerTable("int_table", intTable); } @Before