Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@
.apply(...);

//run a simple query, and register the output as a table in BeamSql;
String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)
.withUdf("MY_FUNC", myFunc));
String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1));

//run a JOIN with one table from TextIO, and one table from another query
PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
Expand Down Expand Up @@ -91,6 +90,8 @@ public static PTransform<PCollectionTuple, PCollection<BeamSqlRow>> query(String
*
* <p>This is a simplified form of {@link #query(String)} where the query must reference
* a single input table.
*
* <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
*/
public static PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>>
simpleQuery(String sqlQuery) throws Exception {
Expand Down Expand Up @@ -151,15 +152,20 @@ private void registerTables(PCollectionTuple input){
*/
private static class SimpleQueryTransform
extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
BeamSqlEnv sqlEnv = new BeamSqlEnv();
private String sqlQuery;

public SimpleQueryTransform(String sqlQuery) {
this.sqlQuery = sqlQuery;
validateQuery();
}

@Override
public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
// public SimpleQueryTransform withUdf(String udfName){
// throw new UnsupportedOperationException("Pending for UDF support");
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best not to check in commented-out code. Since I have no other requests for changes, I'll remove as part of merging.


private void validateQuery() {
SqlNode sqlNode;
try {
sqlNode = sqlEnv.planner.parseQuery(sqlQuery);
Expand All @@ -171,12 +177,19 @@ public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
if (sqlNode instanceof SqlSelect) {
SqlSelect select = (SqlSelect) sqlNode;
String tableName = select.getFrom().toString();
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
.apply(new QueryTransform(sqlQuery, sqlEnv));
if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
}
} else {
throw new UnsupportedOperationException(
"Sql operation: " + sqlNode.toString() + " is not supported!");
}
}

@Override
public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
.apply(new QueryTransform(sqlQuery, sqlEnv));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
*/
@Test
public void testAggregationWithoutWindow() throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A GROUP BY f_int2";
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";

PCollection<BeamSqlRow> result =
inputA1.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testAggregationFunctions() throws Exception{
*/
@Test
public void testDistinct() throws Exception {
String sql = "SELECT distinct f_int, f_long FROM TABLE_A ";
String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";

PCollection<BeamSqlRow> result =
inputA1.apply("testDistinct", BeamSql.simpleQuery(sql));
Expand Down Expand Up @@ -190,7 +190,7 @@ public void testTumbleWindow() throws Exception {
*/
@Test
public void testHopWindow() throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION "
+ "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
PCollection<BeamSqlRow> result =
inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
*/
@Test
public void testSingleFilter() throws Exception {
String sql = "SELECT * FROM TABLE_A WHERE f_int = 1";
String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";

PCollection<BeamSqlRow> result =
inputA1.apply("testSingleFilter", BeamSql.simpleQuery(sql));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
*/
@Test
public void testSelectAll() throws Exception {
String sql = "SELECT * FROM TABLE_A";
String sql = "SELECT * FROM PCOLLECTION";

PCollection<BeamSqlRow> result =
inputA2.apply("testSelectAll", BeamSql.simpleQuery(sql));
Expand Down