From ae6ace264abfbe87cddc01772277117f8ec17423 Mon Sep 17 00:00:00 2001 From: mingmxu Date: Fri, 16 Jun 2017 18:49:18 -0700 Subject: [PATCH 1/3] restrict the scope of BeamSqlEnv --- .../org/apache/beam/dsls/sql/BeamSql.java | 71 ++++++++++++++----- .../org/apache/beam/dsls/sql/BeamSqlCli.java | 28 +++++--- .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 22 ++++-- .../math/BeamSqlMathBinaryExpression.java | 1 - .../dsls/sql/planner/BeamQueryPlanner.java | 7 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 11 ++- .../beam/dsls/sql/rel/BeamFilterRel.java | 8 +-- .../beam/dsls/sql/rel/BeamIOSinkRel.java | 9 ++- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 7 +- .../beam/dsls/sql/rel/BeamIntersectRel.java | 8 +-- .../beam/dsls/sql/rel/BeamMinusRel.java | 8 +-- .../beam/dsls/sql/rel/BeamProjectRel.java | 10 ++- .../apache/beam/dsls/sql/rel/BeamRelNode.java | 11 ++- .../dsls/sql/rel/BeamSetOperatorRelBase.java | 15 ++-- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 12 +++- .../beam/dsls/sql/rel/BeamUnionRel.java | 8 +-- .../beam/dsls/sql/rel/BeamValuesRel.java | 11 ++- .../beam/dsls/sql/utils/CalciteUtils.java | 1 - .../dsls/sql/rel/BeamIntersectRelTest.java | 10 +-- .../beam/dsls/sql/rel/BeamMinusRelTest.java | 10 +-- .../sql/rel/BeamSetOperatorRelBaseTest.java | 8 ++- .../beam/dsls/sql/rel/BeamSortRelTest.java | 26 +++---- .../beam/dsls/sql/rel/BeamUnionRelTest.java | 8 ++- .../beam/dsls/sql/rel/BeamValuesRelTest.java | 12 ++-- 24 files changed, 210 insertions(+), 112 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 04fe05518b72..9625eb7c1049 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -16,9 +16,9 @@ * limitations under the License. */ package org.apache.beam.dsls.sql; - -import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; -import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; +// +//import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; +//import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; @@ -105,37 +105,69 @@ public static PTransform> query(String /** * A {@link PTransform} representing an execution plan for a SQL query. */ +//<<<<<<< HEAD private static class QueryTransform extends PTransform> { +//======= +// public static class QueryTransform extends PTransform> { + private BeamSqlEnv sqlEnv; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv private String sqlQuery; + public QueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; + sqlEnv = new BeamSqlEnv(); + } + + public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) { + this.sqlQuery = sqlQuery; + this.sqlEnv = sqlEnv; } @Override public PCollection expand(PCollectionTuple input) { - //register tables - for (TupleTag sourceTag : input.getAll().keySet()) { - PCollection sourceStream = (PCollection) input.get(sourceTag); - BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); - - registerTable(sourceTag.getId(), - new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); - } +//<<<<<<< HEAD +// //register tables +// for (TupleTag sourceTag : input.getAll().keySet()) { +// PCollection sourceStream = (PCollection) input.get(sourceTag); +// BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); +// +// registerTable(sourceTag.getId(), +// new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); +// } +// +// BeamRelNode beamRelNode = null; +// try { +// beamRelNode = planner.convertToBeamRel(sqlQuery); +//======= + registerTables(input); BeamRelNode beamRelNode = null; try { - beamRelNode = planner.convertToBeamRel(sqlQuery); + beamRelNode = sqlEnv.planner.convertToBeamRel(sqlQuery); +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv } catch (ValidationException | RelConversionException | SqlParseException e) { throw new IllegalStateException(e); } try { - return beamRelNode.buildBeamPipeline(input); + return beamRelNode.buildBeamPipeline(input, sqlEnv); } catch (Exception e) { throw new IllegalStateException(e); } } + + //register tables, related with input PCollections. + private void registerTables(PCollectionTuple input){ + for (TupleTag sourceTag : input.getAll().keySet()) { + PCollection sourceStream = (PCollection) input.get(sourceTag); + BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); + + sqlEnv.registerTable(sourceTag.getId(), + new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); + } + } } /** @@ -144,7 +176,9 @@ public PCollection expand(PCollectionTuple input) { */ private static class SimpleQueryTransform extends PTransform, PCollection> { + BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; + public SimpleQueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; } @@ -157,8 +191,13 @@ public SimpleQueryTransform withUdf(String udfName){ public PCollection expand(PCollection input) { SqlNode sqlNode; try { - sqlNode = planner.parseQuery(sqlQuery); - planner.getPlanner().close(); +//<<<<<<< HEAD +// sqlNode = planner.parseQuery(sqlQuery); +// planner.getPlanner().close(); +//======= + sqlNode = sqlEnv.planner.parseQuery(sqlQuery); + sqlEnv.planner.getPlanner().close(); +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv } catch (SqlParseException e) { throw new IllegalStateException(e); } @@ -167,7 +206,7 @@ public PCollection expand(PCollection input) { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); return PCollectionTuple.of(new TupleTag(tableName), input) - .apply(BeamSql.query(sqlQuery)); + .apply(new QueryTransform(sqlQuery, sqlEnv)); } else { throw new UnsupportedOperationException( "Sql operation: " + sqlNode.toString() + " is not supported!"); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java index dbf9a5978105..5f79e54a3ac1 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java @@ -17,7 +17,7 @@ */ package org.apache.beam.dsls.sql; -import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; +//import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -33,12 +33,17 @@ */ @Experimental public class BeamSqlCli { - /** * Returns a human readable representation of the query execution plan. */ - public static String explainQuery(String sqlString) throws Exception { - BeamRelNode exeTree = planner.convertToBeamRel(sqlString); +//<<<<<<< HEAD + public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { +// BeamRelNode exeTree = planner.convertToBeamRel(sqlString); +//======= +// public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) +// throws ValidationException, RelConversionException, SqlParseException { + BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString); +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv String beamPlan = RelOptUtil.toString(exeTree); return beamPlan; } @@ -46,22 +51,27 @@ public static String explainQuery(String sqlString) throws Exception { /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection compilePipeline(String sqlStatement) throws Exception{ + public static PCollection compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) + throws Exception{ PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() .as(PipelineOptions.class); // FlinkPipelineOptions.class options.setJobName("BeamPlanCreator"); Pipeline pipeline = Pipeline.create(options); - return compilePipeline(sqlStatement, pipeline); + return compilePipeline(sqlStatement, pipeline, sqlEnv); } /** * compile SQL, and return a {@link Pipeline}. */ - public static PCollection compilePipeline(String sqlStatement, Pipeline basePipeline) - throws Exception{ + public static PCollection compilePipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception{ PCollection resultStream = - planner.compileBeamPipeline(sqlStatement, basePipeline); +//<<<<<<< HEAD +// planner.compileBeamPipeline(sqlStatement, basePipeline); +//======= + sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv return resultStream; } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java index d7715c74c82f..1e40188b678d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -42,10 +42,15 @@ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. */ public class BeamSqlEnv { - static SchemaPlus schema; - static BeamQueryPlanner planner; +//<<<<<<< HEAD +// static SchemaPlus schema; +// static BeamQueryPlanner planner; +//======= + SchemaPlus schema; + BeamQueryPlanner planner; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv - static { + public BeamSqlEnv() { schema = Frameworks.createRootSchema(true); planner = new BeamQueryPlanner(schema); } @@ -53,7 +58,7 @@ public class BeamSqlEnv { /** * Register a UDF function which can be used in SQL expression. */ - public static void registerUdf(String functionName, Class clazz, String methodName) { + public void registerUdf(String functionName, Class clazz, String methodName) { schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName)); } @@ -61,15 +66,20 @@ public static void registerUdf(String functionName, Class clazz, String metho * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. * */ - public static void registerTable(String tableName, BaseBeamTable table) { +//<<<<<<< HEAD + public void registerTable(String tableName, BaseBeamTable table) { schema.add(tableName, new BeamCalciteTable(table.getRecordType())); +//======= +// public void registerTable(String tableName, BaseBeamTable table) { +// schema.add(tableName, table); +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv planner.getSourceTables().put(tableName, table); } /** * Find {@link BaseBeamTable} by table name. */ - public static BaseBeamTable findTable(String tableName){ + public BaseBeamTable findTable(String tableName){ return planner.getSourceTables().get(tableName); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java index 11b867aed2ee..f79bcf633131 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java @@ -57,7 +57,6 @@ public abstract BeamSqlPrimitive calculate(BeamSqlPrimitive le /** * The method to check whether operands are numeric or not. - * @param opType */ public boolean isOperandNumeric(SqlTypeName opType) { return SqlTypeName.NUMERIC_TYPES.contains(opType); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 2eaf9e783baf..6ae8a1eb278c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.rel.BeamLogicalConvention; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; @@ -106,12 +107,12 @@ public SqlNode parseQuery(String sqlQuery) throws SqlParseException{ * which is linked with the given {@code pipeline}. The final output stream is returned as * {@code PCollection} so more operations can be applied. */ - public PCollection compileBeamPipeline(String sqlStatement, Pipeline basePipeline) - throws Exception { + public PCollection compileBeamPipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception { BeamRelNode relNode = convertToBeamRel(sqlStatement); // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel. - return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline)); + return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv); } /** diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index bcdc44f32093..b1f4cca3124b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -19,6 +19,11 @@ import java.util.ArrayList; import java.util.List; +//<<<<<<< HEAD +//======= +import org.apache.beam.dsls.sql.BeamSqlEnv; +//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; @@ -72,13 +77,13 @@ public BeamAggregationRel(RelOptCluster cluster, RelTraitSet traits } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); if (windowFieldIdx != -1) { upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index 40fe05c5b8e4..07b5c7cf26c2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.rel; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -49,14 +50,13 @@ public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index 88fff637de0b..58539f871677 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -56,18 +56,17 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { * which is the persisted PCollection. */ @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName); + BaseBeamTable targetTable = sqlEnv.findTable(sourceName); PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter()); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index ed2bf1220905..a664ce1bef28 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -40,9 +40,8 @@ public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); String stageName = BeamSqlRelUtils.getStageName(this); @@ -55,7 +54,7 @@ public PCollection buildBeamPipeline(PCollectionTuple inputPCollecti return sourceStream; } else { //If not, the source PColection is provided with BaseBeamTable.buildIOReader(). - BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName); + BaseBeamTable sourceTable = sqlEnv.findTable(sourceName); return sourceTable.buildIOReader(inputPCollections.getPipeline()); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java index 01e1c336b76a..7cab171eacec 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -51,8 +51,8 @@ public BeamIntersectRel( return new BeamIntersectRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java index bee6c11ac475..b558f4ba8909 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -49,8 +49,8 @@ public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List inp return new BeamMinusRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index e6331c6fc7f8..4d08bcdea461 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -18,6 +18,10 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; +//<<<<<<< HEAD +//======= +import org.apache.beam.dsls.sql.BeamSqlEnv; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -59,13 +63,13 @@ public Project copy(RelTraitSet traitSet, RelNode input, List projects, } @Override - public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); String stageName = BeamSqlRelUtils.getStageName(this); PCollection upstream = - BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections); + BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv); BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index aed4b06b0773..49e986057f87 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -17,14 +17,18 @@ */ package org.apache.beam.dsls.sql.rel; +//<<<<<<< HEAD +//======= +import org.apache.beam.dsls.sql.BeamSqlEnv; +//import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.calcite.rel.RelNode; /** - * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's - * called by {@code BeamQueryPlanner}. + * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added. */ public interface BeamRelNode extends RelNode { @@ -33,5 +37,6 @@ public interface BeamRelNode extends RelNode { * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search) * algorithm. */ - PCollection buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception; + PCollection buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) + throws Exception; } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java index 3d41e3ad4417..4137b10cb5a5 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -20,7 +20,12 @@ import java.io.Serializable; import java.util.List; - +//<<<<<<< HEAD +// +//======= +import org.apache.beam.dsls.sql.BeamSqlEnv; +//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; @@ -62,12 +67,12 @@ public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType, this.all = all; } - public PCollection buildBeamPipeline( - PCollectionTuple inputPCollections) throws Exception { + public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { PCollection leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0)) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); PCollection rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1)) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn(); WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn(); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index 6c7be0b3d65b..de40fedb3bf0 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -24,6 +24,12 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +//<<<<<<< HEAD +//======= +import org.apache.beam.dsls.sql.BeamSqlEnv; +//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +//import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.utils.CalciteUtils; @@ -119,11 +125,11 @@ public BeamSortRel( } } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { RelNode input = getInput(); PCollection upstream = BeamSqlRelUtils.getBeamRelInput(input) - .buildBeamPipeline(inputPCollections); + .buildBeamPipeline(inputPCollections, sqlEnv); Type windowType = upstream.getWindowingStrategy().getWindowFn() .getWindowTypeDescriptor().getType(); if (!windowType.equals(GlobalWindow.class)) { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java index 63cf11afa127..c661585c692c 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java @@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; - +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.PCollection; @@ -81,8 +81,8 @@ public BeamUnionRel(RelInput input) { return new BeamUnionRel(getCluster(), traitSet, inputs, all); } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { - return delegate.buildBeamPipeline(inputPCollections); + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { + return delegate.buildBeamPipeline(inputPCollections, sqlEnv); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index ce7576817984..d4ca38a4c2aa 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -22,7 +22,12 @@ import java.util.ArrayList; import java.util.List; - +//<<<<<<< HEAD +// +//======= +import org.apache.beam.dsls.sql.BeamSqlEnv; +//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; +//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; @@ -57,8 +62,8 @@ public BeamValuesRel( } - @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections) - throws Exception { + @Override public PCollection buildBeamPipeline(PCollectionTuple inputPCollections + , BeamSqlEnv sqlEnv) throws Exception { List rows = new ArrayList<>(tuples.size()); String stageName = BeamSqlRelUtils.getStageName(this); if (tuples.isEmpty()) { diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java index 69ca44ba31ce..ac395d318621 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java @@ -74,7 +74,6 @@ public static Integer toJavaType(SqlTypeName typeName) { /** * Get the {@code SqlTypeName} for the specified column of a table. - * @return */ public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) { return toCalciteType(schema.getFieldsType().get(index)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java index 02223c2acede..47fdc16582e3 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -34,6 +34,8 @@ * Test for {@code BeamIntersectRel}. */ public class BeamIntersectRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable @@ -57,8 +59,8 @@ public class BeamIntersectRelTest { @BeforeClass public static void setUp() { - BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); } @Test @@ -70,7 +72,7 @@ public void testIntersect() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -93,7 +95,7 @@ public void testIntersectAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java index cd6ba163f3d1..688ff8e05234 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -34,6 +34,8 @@ * Test for {@code BeamMinusRel}. */ public class BeamMinusRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable @@ -58,8 +60,8 @@ public class BeamMinusRelTest { @Before public void setUp() { - BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); + sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); MockedBeamSqlTable.CONTENT.clear(); } @@ -72,7 +74,7 @@ public void testExcept() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -93,7 +95,7 @@ public void testExceptAll() throws Exception { + "SELECT order_id, site_id, price " + "FROM ORDER_DETAILS2 "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java index 4936062bf0c0..f10a767f6654 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java @@ -42,6 +42,8 @@ * Test for {@code BeamSetOperatorRelBase}. */ public class BeamSetOperatorRelBaseTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); public static final Date THE_DATE = new Date(); @@ -57,7 +59,7 @@ public class BeamSetOperatorRelBaseTest { @BeforeClass public static void prepare() { THE_DATE.setTime(100000); - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); } @Test @@ -71,7 +73,7 @@ public void testSameWindow() throws Exception { + "FROM ORDER_DETAILS GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); List expRows = MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -100,7 +102,7 @@ public void testDifferentWindows() throws Exception { // use a real pipeline rather than the TestPipeline because we are // testing exceptions, the pipeline will not actually run. Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create()); - BeamSqlCli.compilePipeline(sql, pipeline1); + BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); pipeline.run(); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index cfdbd5370197..251998499182 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -36,6 +36,8 @@ * Test for {@code BeamSortRel}. */ public class BeamSortRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); @@ -69,7 +71,7 @@ public void testOrderBy_basic() throws Exception { + "ORDER BY order_id asc, site_id desc limit 4"; System.out.println(sql); - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -86,7 +88,7 @@ public void testOrderBy_basic() throws Exception { @Test public void testOrderBy_nullsFirst() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable + sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -96,7 +98,7 @@ public void testOrderBy_nullsFirst() throws Exception { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -106,7 +108,7 @@ public void testOrderBy_nullsFirst() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -124,7 +126,7 @@ public void testOrderBy_nullsFirst() throws Exception { @Test public void testOrderBy_nullsLast() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable + sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -134,7 +136,7 @@ public void testOrderBy_nullsLast() throws Exception { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -144,7 +146,7 @@ public void testOrderBy_nullsLast() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -167,7 +169,7 @@ public void testOrderBy_with_offset() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 4 offset 4"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -190,7 +192,7 @@ public void testOrderBy_bigFetch() throws Exception { + "FROM ORDER_DETAILS " + "ORDER BY order_id asc, site_id desc limit 11"; - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); pipeline.run().waitUntilFinish(); assertEquals( @@ -221,13 +223,13 @@ public void testOrderBy_exception() throws Exception { + "ORDER BY order_id asc limit 11"; TestPipeline pipeline = TestPipeline.create(); - BeamSqlCli.compilePipeline(sql, pipeline); + BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); } @Before public void prepare() { - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); + sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); MockedBeamSqlTable.CONTENT.clear(); } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java index c2a05973a59e..c5aa13224a6c 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java @@ -34,6 +34,8 @@ * Test for {@code BeamUnionRel}. */ public class BeamUnionRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable @@ -46,7 +48,7 @@ public class BeamUnionRelTest { @BeforeClass public static void prepare() { - BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); } @Test @@ -58,7 +60,7 @@ public void testUnion() throws Exception { + " order_id, site_id, price " + "FROM ORDER_DETAILS "; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", @@ -81,7 +83,7 @@ public void testUnionAll() throws Exception { + " SELECT order_id, site_id, price " + "FROM ORDER_DETAILS"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java index 4557e8e3abe4..9a5070a0ca52 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -35,6 +35,8 @@ * Test for {@code BeamValuesRel}. */ public class BeamValuesRelTest { + static BeamSqlEnv sqlEnv = new BeamSqlEnv(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); private static MockedBeamSqlTable stringTable = MockedBeamSqlTable @@ -49,7 +51,7 @@ public class BeamValuesRelTest { public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.VARCHAR, "name", SqlTypeName.VARCHAR, "description", @@ -61,7 +63,7 @@ public void testValues() throws Exception { @Test public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "c0", SqlTypeName.INTEGER, "c1", @@ -73,7 +75,7 @@ public void testValues_castInt() throws Exception { @Test public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; - PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "EXPR$0", SqlTypeName.CHAR, "EXPR$1", @@ -84,8 +86,8 @@ public void testValues_onlySelect() throws Exception { @BeforeClass public static void prepareClass() { - BeamSqlEnv.registerTable("string_table", stringTable); - BeamSqlEnv.registerTable("int_table", intTable); + sqlEnv.registerTable("string_table", stringTable); + sqlEnv.registerTable("int_table", intTable); } @Before From f7e4a51da30eb42ca6cc99fc5c079e410834352c Mon Sep 17 00:00:00 2001 From: mingmxu Date: Mon, 19 Jun 2017 18:35:46 -0700 Subject: [PATCH 2/3] update interface of BeamSql.simpleQuery --- .../org/apache/beam/dsls/sql/BeamSql.java | 64 ++++++------------- .../org/apache/beam/dsls/sql/BeamSqlCli.java | 12 ---- .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 10 --- .../beam/dsls/sql/example/BeamSqlExample.java | 4 +- .../beam/dsls/sql/rel/BeamAggregationRel.java | 4 -- .../beam/dsls/sql/rel/BeamProjectRel.java | 3 - .../apache/beam/dsls/sql/rel/BeamRelNode.java | 4 -- .../dsls/sql/rel/BeamSetOperatorRelBase.java | 5 -- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 5 -- .../beam/dsls/sql/rel/BeamValuesRel.java | 5 -- .../dsls/sql/BeamSqlDslAggregationTest.java | 6 +- .../beam/dsls/sql/BeamSqlDslFilterTest.java | 2 +- .../beam/dsls/sql/BeamSqlDslProjectTest.java | 2 +- 13 files changed, 28 insertions(+), 98 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 9625eb7c1049..551fd33f2c6d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -16,9 +16,6 @@ * limitations under the License. */ package org.apache.beam.dsls.sql; -// -//import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; -//import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable; import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; @@ -47,17 +44,15 @@ Pipeline p = Pipeline.create(options); //create table from TextIO; -TableSchema tableASchema = ...; PCollection inputTableA = p.apply(TextIO.read().from("/my/input/patha")) - .apply(BeamSql.fromTextRow(tableASchema)); -TableSchema tableBSchema = ...; + .apply(...); PCollection inputTableB = p.apply(TextIO.read().from("/my/input/pathb")) - .apply(BeamSql.fromTextRow(tableBSchema)); + .apply(...); //run a simple query, and register the output as a table in BeamSql; -String sql1 = "select MY_FUNC(c1), c2 from TABLE_A"; -PCollection outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)) - .withUdf("MY_FUNC", myFunc); +String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION"; +PCollection outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1) + .withUdf("MY_FUNC", myFunc)); //run a JOIN with one table from TextIO, and one table from another query PCollection outputTableB = PCollectionTuple.of( @@ -105,14 +100,9 @@ public static PTransform> query(String /** * A {@link PTransform} representing an execution plan for a SQL query. */ -//<<<<<<< HEAD private static class QueryTransform extends PTransform> { -//======= -// public static class QueryTransform extends PTransform> { private BeamSqlEnv sqlEnv; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv private String sqlQuery; public QueryTransform(String sqlQuery) { @@ -127,26 +117,11 @@ public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) { @Override public PCollection expand(PCollectionTuple input) { -//<<<<<<< HEAD -// //register tables -// for (TupleTag sourceTag : input.getAll().keySet()) { -// PCollection sourceStream = (PCollection) input.get(sourceTag); -// BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); -// -// registerTable(sourceTag.getId(), -// new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); -// } -// -// BeamRelNode beamRelNode = null; -// try { -// beamRelNode = planner.convertToBeamRel(sqlQuery); -//======= registerTables(input); BeamRelNode beamRelNode = null; try { beamRelNode = sqlEnv.planner.convertToBeamRel(sqlQuery); -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv } catch (ValidationException | RelConversionException | SqlParseException e) { throw new IllegalStateException(e); } @@ -158,7 +133,7 @@ public PCollection expand(PCollectionTuple input) { } } - //register tables, related with input PCollections. + //register tables, related with input PCollections. private void registerTables(PCollectionTuple input){ for (TupleTag sourceTag : input.getAll().keySet()) { PCollection sourceStream = (PCollection) input.get(sourceTag); @@ -176,28 +151,24 @@ private void registerTables(PCollectionTuple input){ */ private static class SimpleQueryTransform extends PTransform, PCollection> { + private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; public SimpleQueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; + validateQuery(); } - public SimpleQueryTransform withUdf(String udfName){ - throw new UnsupportedOperationException("Pending for UDF support"); - } +// public SimpleQueryTransform withUdf(String udfName){ +// throw new UnsupportedOperationException("Pending for UDF support"); +// } - @Override - public PCollection expand(PCollection input) { + private void validateQuery() { SqlNode sqlNode; try { -//<<<<<<< HEAD -// sqlNode = planner.parseQuery(sqlQuery); -// planner.getPlanner().close(); -//======= sqlNode = sqlEnv.planner.parseQuery(sqlQuery); sqlEnv.planner.getPlanner().close(); -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv } catch (SqlParseException e) { throw new IllegalStateException(e); } @@ -205,12 +176,19 @@ public PCollection expand(PCollection input) { if (sqlNode instanceof SqlSelect) { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); - return PCollectionTuple.of(new TupleTag(tableName), input) - .apply(new QueryTransform(sqlQuery, sqlEnv)); + if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) { + throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME); + } } else { throw new UnsupportedOperationException( "Sql operation: " + sqlNode.toString() + " is not supported!"); } } + + @Override + public PCollection expand(PCollection input) { + return PCollectionTuple.of(new TupleTag(PCOLLECTION_TABLE_NAME), input) + .apply(new QueryTransform(sqlQuery, sqlEnv)); + } } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java index 5f79e54a3ac1..50da244ec1a2 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java @@ -17,8 +17,6 @@ */ package org.apache.beam.dsls.sql; -//import static org.apache.beam.dsls.sql.BeamSqlEnv.planner; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; @@ -36,14 +34,8 @@ public class BeamSqlCli { /** * Returns a human readable representation of the query execution plan. */ -//<<<<<<< HEAD public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { -// BeamRelNode exeTree = planner.convertToBeamRel(sqlString); -//======= -// public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) -// throws ValidationException, RelConversionException, SqlParseException { BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString); -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv String beamPlan = RelOptUtil.toString(exeTree); return beamPlan; } @@ -67,11 +59,7 @@ public static PCollection compilePipeline(String sqlStatement, BeamS public static PCollection compilePipeline(String sqlStatement, Pipeline basePipeline , BeamSqlEnv sqlEnv) throws Exception{ PCollection resultStream = -//<<<<<<< HEAD -// planner.compileBeamPipeline(sqlStatement, basePipeline); -//======= sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv return resultStream; } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java index 1e40188b678d..baa2617d9fee 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -42,13 +42,8 @@ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. */ public class BeamSqlEnv { -//<<<<<<< HEAD -// static SchemaPlus schema; -// static BeamQueryPlanner planner; -//======= SchemaPlus schema; BeamQueryPlanner planner; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv public BeamSqlEnv() { schema = Frameworks.createRootSchema(true); @@ -66,13 +61,8 @@ public void registerUdf(String functionName, Class clazz, String methodName) * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. * */ -//<<<<<<< HEAD public void registerTable(String tableName, BaseBeamTable table) { schema.add(tableName, new BeamCalciteTable(table.getRecordType())); -//======= -// public void registerTable(String tableName, BaseBeamTable table) { -// schema.add(tableName, table); -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv planner.getSourceTables().put(tableName, table); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 31f8302ab188..5f09fdd2737d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -63,13 +63,13 @@ public static void main(String[] args) throws Exception { //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection outputStream = inputTable.apply( - BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1")); + BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1")); //log out the output record; outputStream.apply("log_result", MapElements.via(new SimpleFunction() { public Void apply(BeamSqlRow input) { - System.out.println("TABLE_A: " + input); + System.out.println("PCOLLECTION: " + input); return null; } })); diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index b1f4cca3124b..7a1d003a239d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -19,11 +19,7 @@ import java.util.ArrayList; import java.util.List; -//<<<<<<< HEAD -//======= import org.apache.beam.dsls.sql.BeamSqlEnv; -//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index 4d08bcdea461..2cdfc720af26 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -18,10 +18,7 @@ package org.apache.beam.dsls.sql.rel; import java.util.List; -//<<<<<<< HEAD -//======= import org.apache.beam.dsls.sql.BeamSqlEnv; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor; import org.apache.beam.dsls.sql.schema.BeamSqlRow; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index 49e986057f87..d4c98a382843 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -17,11 +17,7 @@ */ package org.apache.beam.dsls.sql.rel; -//<<<<<<< HEAD -//======= import org.apache.beam.dsls.sql.BeamSqlEnv; -//import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java index 4137b10cb5a5..939c9c86f840 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java @@ -20,12 +20,7 @@ import java.io.Serializable; import java.util.List; -//<<<<<<< HEAD -// -//======= import org.apache.beam.dsls.sql.BeamSqlEnv; -//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms; import org.apache.beam.sdk.transforms.MapElements; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java index de40fedb3bf0..75f9717c08b7 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java @@ -24,12 +24,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; -//<<<<<<< HEAD -//======= import org.apache.beam.dsls.sql.BeamSqlEnv; -//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; -//import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.utils.CalciteUtils; diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java index d4ca38a4c2aa..030d2c85160b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java @@ -22,12 +22,7 @@ import java.util.ArrayList; import java.util.List; -//<<<<<<< HEAD -// -//======= import org.apache.beam.dsls.sql.BeamSqlEnv; -//import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils; -//>>>>>>> eb5852b... restrict the scope of BeamSqlEnv import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index f7349c6e40cd..b0509ae1a330 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -37,7 +37,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testAggregationWithoutWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2"; + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; PCollection result = inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); @@ -125,7 +125,7 @@ public void testAggregationFunctions() throws Exception{ */ @Test public void testDistinct() throws Exception { - String sql = "SELECT distinct f_int, f_long FROM TABLE_A "; + String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; PCollection result = inputA1.apply("testDistinct", BeamSql.simpleQuery(sql)); @@ -190,7 +190,7 @@ public void testTumbleWindow() throws Exception { */ @Test public void testHopWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION " + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; PCollection result = inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java index b68e52696620..254b96d71c3e 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java @@ -33,7 +33,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { */ @Test public void testSingleFilter() throws Exception { - String sql = "SELECT * FROM TABLE_A WHERE f_int = 1"; + String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; PCollection result = inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java index 2998682e16c6..1faa4d0c56a1 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -36,7 +36,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { */ @Test public void testSelectAll() throws Exception { - String sql = "SELECT * FROM TABLE_A"; + String sql = "SELECT * FROM PCOLLECTION"; PCollection result = inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql)); From 8cb47dd5647c413e965fce1d55dd27757edebeb5 Mon Sep 17 00:00:00 2001 From: mingmxu Date: Thu, 22 Jun 2017 10:37:22 -0700 Subject: [PATCH 3/3] remove select * from PCOLLECTION to a separated PR. --- .../java/org/apache/beam/dsls/sql/BeamSql.java | 18 +++++------------- .../dsls/sql/BeamSqlDslAggregationTest.java | 6 +++--- .../beam/dsls/sql/BeamSqlDslFilterTest.java | 2 +- .../beam/dsls/sql/BeamSqlDslProjectTest.java | 2 +- 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 551fd33f2c6d..be1b27b3e2f9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -50,7 +50,7 @@ .apply(...); //run a simple query, and register the output as a table in BeamSql; -String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION"; +String sql1 = "select MY_FUNC(c1), c2 from TABLE_A"; PCollection outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1) .withUdf("MY_FUNC", myFunc)); @@ -151,20 +151,19 @@ private void registerTables(PCollectionTuple input){ */ private static class SimpleQueryTransform extends PTransform, PCollection> { - private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; public SimpleQueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; - validateQuery(); } // public SimpleQueryTransform withUdf(String udfName){ // throw new UnsupportedOperationException("Pending for UDF support"); // } - private void validateQuery() { + @Override + public PCollection expand(PCollection input) { SqlNode sqlNode; try { sqlNode = sqlEnv.planner.parseQuery(sqlQuery); @@ -176,19 +175,12 @@ private void validateQuery() { if (sqlNode instanceof SqlSelect) { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); - if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) { - throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME); - } + return PCollectionTuple.of(new TupleTag(tableName), input) + .apply(new QueryTransform(sqlQuery, sqlEnv)); } else { throw new UnsupportedOperationException( "Sql operation: " + sqlNode.toString() + " is not supported!"); } } - - @Override - public PCollection expand(PCollection input) { - return PCollectionTuple.of(new TupleTag(PCOLLECTION_TABLE_NAME), input) - .apply(new QueryTransform(sqlQuery, sqlEnv)); - } } } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index b0509ae1a330..f7349c6e40cd 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -37,7 +37,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testAggregationWithoutWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2"; PCollection result = inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); @@ -125,7 +125,7 @@ public void testAggregationFunctions() throws Exception{ */ @Test public void testDistinct() throws Exception { - String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; + String sql = "SELECT distinct f_int, f_long FROM TABLE_A "; PCollection result = inputA1.apply("testDistinct", BeamSql.simpleQuery(sql)); @@ -190,7 +190,7 @@ public void testTumbleWindow() throws Exception { */ @Test public void testHopWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION " + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; PCollection result = inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java index 254b96d71c3e..b68e52696620 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java @@ -33,7 +33,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { */ @Test public void testSingleFilter() throws Exception { - String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; + String sql = "SELECT * FROM TABLE_A WHERE f_int = 1"; PCollection result = inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java index 1faa4d0c56a1..2998682e16c6 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -36,7 +36,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { */ @Test public void testSelectAll() throws Exception { - String sql = "SELECT * FROM PCOLLECTION"; + String sql = "SELECT * FROM TABLE_A"; PCollection result = inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));