From 1f612049b83a67070d13aae790d61e0f71d79ca7 Mon Sep 17 00:00:00 2001 From: mingmxu Date: Thu, 22 Jun 2017 16:50:58 -0700 Subject: [PATCH] use static table name PCOLLECTION in BeamSql.simpleQuery. --- .../org/apache/beam/dsls/sql/BeamSql.java | 27 ++++++++++++++----- .../dsls/sql/BeamSqlDslAggregationTest.java | 6 ++--- .../beam/dsls/sql/BeamSqlDslFilterTest.java | 2 +- .../beam/dsls/sql/BeamSqlDslProjectTest.java | 2 +- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index e68188bf25b4..5f9038039b08 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -50,9 +50,8 @@ .apply(...); //run a simple query, and register the output as a table in BeamSql; -String sql1 = "select MY_FUNC(c1), c2 from TABLE_A"; -PCollection outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1) - .withUdf("MY_FUNC", myFunc)); +String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION"; +PCollection outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)); //run a JOIN with one table from TextIO, and one table from another query PCollection outputTableB = PCollectionTuple.of( @@ -91,6 +90,8 @@ public static PTransform> query(String * *

This is a simplified form of {@link #query(String)} where the query must reference * a single input table. + * + *

Make sure to query it from a static table name PCOLLECTION. */ public static PTransform, PCollection> simpleQuery(String sqlQuery) throws Exception { @@ -151,15 +152,20 @@ private void registerTables(PCollectionTuple input){ */ private static class SimpleQueryTransform extends PTransform, PCollection> { + private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; public SimpleQueryTransform(String sqlQuery) { this.sqlQuery = sqlQuery; + validateQuery(); } - @Override - public PCollection expand(PCollection input) { + // public SimpleQueryTransform withUdf(String udfName){ + // throw new UnsupportedOperationException("Pending for UDF support"); + // } + + private void validateQuery() { SqlNode sqlNode; try { sqlNode = sqlEnv.planner.parseQuery(sqlQuery); @@ -171,12 +177,19 @@ public PCollection expand(PCollection input) { if (sqlNode instanceof SqlSelect) { SqlSelect select = (SqlSelect) sqlNode; String tableName = select.getFrom().toString(); - return PCollectionTuple.of(new TupleTag(tableName), input) - .apply(new QueryTransform(sqlQuery, sqlEnv)); + if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) { + throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME); + } } else { throw new UnsupportedOperationException( "Sql operation: " + sqlNode.toString() + " is not supported!"); } } + + @Override + public PCollection expand(PCollection input) { + return PCollectionTuple.of(new TupleTag(PCOLLECTION_TABLE_NAME), input) + .apply(new QueryTransform(sqlQuery, sqlEnv)); + } } } diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index f7349c6e40cd..b0509ae1a330 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -37,7 +37,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testAggregationWithoutWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2"; + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; PCollection result = inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); @@ -125,7 +125,7 @@ public void testAggregationFunctions() throws Exception{ */ @Test public void testDistinct() throws Exception { - String sql = "SELECT distinct f_int, f_long FROM TABLE_A "; + String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; PCollection result = inputA1.apply("testDistinct", BeamSql.simpleQuery(sql)); @@ -190,7 +190,7 @@ public void testTumbleWindow() throws Exception { */ @Test public void testHopWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION " + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; PCollection result = inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java index b68e52696620..254b96d71c3e 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java @@ -33,7 +33,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase { */ @Test public void testSingleFilter() throws Exception { - String sql = "SELECT * FROM TABLE_A WHERE f_int = 1"; + String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; PCollection result = inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql)); diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java index 2998682e16c6..1faa4d0c56a1 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -36,7 +36,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { */ @Test public void testSelectAll() throws Exception { - String sql = "SELECT * FROM TABLE_A"; + String sql = "SELECT * FROM PCOLLECTION"; PCollection result = inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));