From 804a9fd07a3beda2ae9e9c89b1897a2fed4c479a Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 25 Feb 2021 09:47:45 -0500 Subject: [PATCH] Refactor ZetaSqlDialectSpecTest and add some passing tests. --- .../sdk/extensions/sql/zetasql/TestInput.java | 6 +- .../sql/zetasql/ZetaSqlDialectSpecTest.java | 764 ++++++------------ .../sql/zetasql/ZetaSqlTestBase.java | 1 + 3 files changed, 236 insertions(+), 535 deletions(-) diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java index 322767f295e9..292dd4ca5831 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java @@ -255,8 +255,12 @@ class TestInput { .addValue(Row.withSchema(STRUCT_OF_ARRAY).addArray("2", "3").build()) .build()); - private static final Schema STRUCT_OF_STRUCT = + public static final Schema STRUCT_OF_STRUCT = Schema.builder().addRowField("row", STRUCT_SCHEMA).build(); + public static final TestBoundedTable TABLE_WITH_STRUCT_OF_STRUCT = + TestBoundedTable.of(STRUCT_OF_STRUCT) + .addRows(Row.withSchema(STRUCT_SCHEMA).attachValues(1L, "1")) + .addRows(Row.withSchema(STRUCT_SCHEMA).attachValues(2L, "2")); public static final TestBoundedTable TABLE_WITH_ARRAY_OF_STRUCT_OF_STRUCT = TestBoundedTable.of( Schema.builder().addArrayField("array_col", FieldType.row(STRUCT_OF_STRUCT)).build()) diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java index 2f6e4cd65032..9f4d1b90bb35 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java @@ -37,6 +37,7 @@ import java.util.Map; import org.apache.beam.sdk.extensions.sql.SqlTransform; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; +import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.schemas.Schema; @@ -71,6 +72,24 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase { @Rule public transient TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); + private PCollection execute(String sql, QueryParameters params) { + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); + return BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + } + + private PCollection execute(String sql) { + return execute(sql, QueryParameters.ofNone()); + } + + private PCollection execute(String sql, Map params) { + return execute(sql, QueryParameters.ofNamed(params)); + } + + private PCollection execute(String sql, List params) { + return execute(sql, QueryParameters.ofPositional(params)); + } + @Before public void setUp() { initialize(); @@ -83,9 +102,7 @@ public void testSimpleSelect() { + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " + "CAST ('string' as STRING);"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() .addInt64Field("field1") @@ -173,9 +190,7 @@ public void testByteLiterals() { byte[] byteString = new byte[] {'a', 'b', 'c'}; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addNullableField("ColA", FieldType.BYTES).build(); @@ -193,9 +208,7 @@ public void testByteString() { ImmutableMap params = ImmutableMap.builder().put("p0", Value.createBytesValue(byteString)).build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("ColA", FieldType.BOOLEAN).build(); @@ -208,9 +221,7 @@ public void testByteString() { public void testStringLiterals() { String sql = "SELECT '\"America/Los_Angeles\"\\n'"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addNullableField("ColA", FieldType.STRING).build(); @@ -225,9 +236,7 @@ public void testParameterString() { String sql = "SELECT ?"; ImmutableList params = ImmutableList.of(Value.createStringValue("abc\n")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("ColA", FieldType.STRING).build(); @@ -246,9 +255,7 @@ public void testEQ1() { .put("p1", Value.createBoolValue(true)) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); PAssert.that(stream) @@ -267,9 +274,7 @@ public void testEQ2() { .put("p1", Value.createDoubleValue(Double.POSITIVE_INFINITY)) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addBooleanField("field1").build(); @@ -287,9 +292,7 @@ public void testEQ3() { .put("p1", Value.createDoubleValue(3.14)) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -308,9 +311,7 @@ public void testEQ4() { .put("p1", Value.createBytesValue(ByteString.copyFromUtf8("hello"))) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -321,9 +322,7 @@ public void testEQ4() { @Test public void testEQ5() { String sql = "SELECT b'hello' = b'hello' AS ColA"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -336,9 +335,7 @@ public void testEQ6() { String sql = "SELECT ? = ? AS ColA"; ImmutableList params = ImmutableList.of(Value.createInt64Value(4L), Value.createInt64Value(5L)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -350,9 +347,7 @@ public void testEQ6() { public void testIn() { String sql = "SELECT 'b' IN ('a', 'b', 'c')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -368,9 +363,7 @@ public void testIn() { public void testInArray() { String sql = "SELECT 'b' IN UNNEST(['a', 'b', 'c'])"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -388,9 +381,7 @@ public void testIsNotNull1() { ImmutableMap params = ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -407,9 +398,7 @@ public void testIsNotNull2() { Value.createNullValue( TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -429,9 +418,7 @@ public void testIsNotNull3() { new StructField( "a", TypeFactory.createSimpleType(TypeKind.TYPE_STRING)))))); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -451,9 +438,7 @@ public void testIfBasic() { "p2", Value.createInt64Value(2)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.INT64).build(); @@ -469,9 +454,7 @@ public void testIfPositional() { ImmutableList.of( Value.createBoolValue(true), Value.createInt64Value(1), Value.createInt64Value(2)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.INT64).build(); @@ -491,9 +474,7 @@ public void testCoalesceBasic() { "p2", Value.createStringValue("nay")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -507,9 +488,7 @@ public void testCoalesceSingleArgument() { ImmutableMap params = ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_INT64)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.array(FieldType.INT64)).build(); @@ -530,9 +509,7 @@ public void testCoalesceNullArray() { Value.createNullValue( TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.array(FieldType.INT64)).build(); @@ -551,9 +528,7 @@ public void testNullIfCoercion() { "p1", Value.createSimpleNullValue(TypeKind.TYPE_DOUBLE)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.DOUBLE).build(); @@ -596,9 +571,7 @@ public void testIfTimestamp() { Value.createTimestampValueFromUnixMicros( DateTime.parse("2019-01-01T00:00:00Z").getMillis() * 1000)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", DATETIME).build(); @@ -634,9 +607,7 @@ public void testNullIfPositive() { ImmutableMap.of( "p0", Value.createStringValue("null"), "p1", Value.createStringValue("null")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -651,9 +622,7 @@ public void testNullIfNegative() { ImmutableMap.of( "p0", Value.createStringValue("foo"), "p1", Value.createStringValue("null")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -668,9 +637,7 @@ public void testIfNullPositive() { ImmutableMap.of( "p0", Value.createStringValue("foo"), "p1", Value.createStringValue("default")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -688,9 +655,7 @@ public void testIfNullNegative() { "p1", Value.createStringValue("yay")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -708,9 +673,7 @@ public void testEmptyArrayParameter() { TypeFactory.createArrayType(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)), ImmutableList.of())); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addArrayField("field1", FieldType.INT64).build(); @@ -722,9 +685,7 @@ public void testEmptyArrayParameter() { @Test public void testEmptyArrayLiteral() { String sql = "SELECT ARRAY[];"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addArrayField("field1", FieldType.STRING).build(); @@ -740,9 +701,7 @@ public void testLike1() { ImmutableMap.of( "p0", Value.createStringValue("ab%"), "p1", Value.createStringValue("ab\\%")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -760,9 +719,7 @@ public void testLikeNullPattern() { "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -777,9 +734,7 @@ public void testLikeAllowsEscapingNonSpecialCharacter() { ImmutableMap params = ImmutableMap.of("p0", Value.createStringValue("ab"), "p1", Value.createStringValue("\\ab")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -794,9 +749,7 @@ public void testLikeAllowsEscapingBackslash() { ImmutableMap.of( "p0", Value.createStringValue("a\\c"), "p1", Value.createStringValue("a\\\\c")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -814,9 +767,7 @@ public void testLikeBytes() { "p1", Value.createBytesValue(ByteString.copyFromUtf8("__%"))); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); @@ -835,9 +786,7 @@ public void testSimpleUnionAll() { + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " + "CAST ('string' as STRING);"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -866,9 +815,7 @@ public void testSimpleUnionAll() { @Test public void testThreeWayUnionAll() { String sql = "SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3)"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); @@ -891,9 +838,7 @@ public void testSimpleUnionDISTINCT() { + "CAST ('2018-09-15 12:59:59.000000+00' as TIMESTAMP), " + "CAST ('string' as STRING);"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -922,9 +867,7 @@ public void testZetaSQLInnerJoin() { + " on " + " t1.Key = t2.RowKey AND t1.ts = t2.ts"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -939,9 +882,7 @@ public void testZetaSQLInnerJoin() { public void testZetaSQLInnerJoinWithUsing() { String sql = "SELECT t1.Key " + "FROM KeyValue AS t1" + " INNER JOIN BigTable AS t2 USING(ts)"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -961,9 +902,7 @@ public void testZetaSQLInnerJoinTwo() { + " on " + " t2.RowKey = t1.Key AND t2.ts = t1.ts"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -982,9 +921,7 @@ public void testZetaSQLLeftOuterJoin() { + " on " + " t1.Key = t2.RowKey"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schemaOne = Schema.builder() @@ -1038,9 +975,7 @@ public void testZetaSQLRightOuterJoin() { + " on " + " t1.Key = t2.RowKey"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schemaOne = Schema.builder() @@ -1094,9 +1029,7 @@ public void testZetaSQLFullOuterJoin() { + " on " + " t1.Key = t2.RowKey"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schemaOne = Schema.builder() @@ -1196,9 +1129,7 @@ public void testZetaSQLThreeWayInnerJoin() { + "JOIN Spanner as t3 " + "ON (t3.ColId = t1.Key)"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -1223,9 +1154,7 @@ public void testZetaSQLTableJoinOnItselfWithFiltering() { + "JOIN Spanner as t2 " + "ON (t1.ColId = t2.ColId) WHERE t1.ColId = 17"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -1246,9 +1175,7 @@ public void testZetaSQLTableJoinOnItselfWithFiltering() { public void testZetaSQLSelectFromSelect() { String sql = "SELECT * FROM (SELECT \"apple\" AS fruit, \"carrot\" AS vegetable);"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").addStringField("field2").build(); @@ -1268,9 +1195,7 @@ public void testZetaSQLSelectFromSelect() { public void testZetaSQLSelectFromTable() { String sql = "SELECT Key, Value FROM KeyValue;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); @@ -1286,9 +1211,7 @@ public void testZetaSQLSelectFromTable() { public void testZetaSQLSelectFromTableLimit() { String sql = "SELECT Key, Value FROM KeyValue LIMIT 2;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); PAssert.that(stream) @@ -1302,9 +1225,7 @@ public void testZetaSQLSelectFromTableLimit() { @Test public void testZetaSQLSelectFromTableLimit0() { String sql = "SELECT Key, Value FROM KeyValue LIMIT 0;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream).containsInAnyOrder(); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } @@ -1340,9 +1261,7 @@ public void testZetaSQLSelectFromTableOrderLimit() { String sql = "SELECT x, y FROM (SELECT 1 as x, 0 as y UNION ALL SELECT 0, 0 " + "UNION ALL SELECT 1, 0 UNION ALL SELECT 1, 1) ORDER BY x LIMIT 1"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L, 0L).build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -1353,9 +1272,7 @@ public void testZetaSQLSelectFromTableLimitOffset() { String sql = "SELECT COUNT(a) FROM (\n" + "SELECT a FROM (SELECT 1 a UNION ALL SELECT 2 UNION ALL SELECT 3) LIMIT 3 OFFSET 1);"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(2L).build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -1366,9 +1283,7 @@ public void testZetaSQLSelectFromTableLimitOffset() { @Test public void testZetaSQLSelectFromTableOrderByLimit() { String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC LIMIT 2;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); PAssert.that(stream) @@ -1393,9 +1308,7 @@ public void testZetaSQLSelectFromTableWithStructType2() { String sql = "SELECT table_with_struct.struct_col.struct_col_str FROM table_with_struct WHERE id = 1;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue("row_one").build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -1407,9 +1320,7 @@ public void testZetaSQLStructFieldAccessInFilter() { "SELECT table_with_struct.id FROM table_with_struct WHERE" + " table_with_struct.struct_col.struct_col_str = 'row_one';"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(1L).build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -1421,9 +1332,7 @@ public void testZetaSQLStructFieldAccessInCast() { "SELECT CAST(table_with_struct.id AS STRING) FROM table_with_struct WHERE" + " table_with_struct.struct_col.struct_col_str = 'row_one';"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue("1").build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -1436,9 +1345,7 @@ public void testZetaSQLStructFieldAccessInCast2() { "SELECT CAST(A.struct_col.struct_col_str AS TIMESTAMP) FROM table_with_struct_ts_string AS" + " A"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addDateTimeField("field").build(); PAssert.that(stream) .containsInAnyOrder( @@ -1491,9 +1398,7 @@ public void testAggregateWithAndWithoutColumnRefs() { public void testZetaSQLStructFieldAccessInGroupBy() { String sql = "SELECT rowCol.row_id, COUNT(*) FROM table_with_struct_two GROUP BY rowCol.row_id"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); PAssert.that(stream) .containsInAnyOrder( @@ -1508,9 +1413,7 @@ public void testZetaSQLAnyValueInGroupBy() { String sql = "SELECT rowCol.row_id as key, ANY_VALUE(rowCol.data) as any_value FROM table_with_struct_two GROUP BY rowCol.row_id"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Map> allowedTuples = new HashMap<>(); allowedTuples.put(1L, Arrays.asList("data1")); allowedTuples.put(2L, Arrays.asList("data2")); @@ -1539,9 +1442,7 @@ public void testZetaSQLStructFieldAccessInGroupBy2() { "SELECT rowCol.data, MAX(rowCol.row_id), MIN(rowCol.row_id) FROM table_with_struct_two" + " GROUP BY rowCol.data"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() .addStringField("field1") @@ -1563,9 +1464,7 @@ public void testZetaSQLStructFieldAccessInnerJoin() { + "table_with_struct AS B " + "ON A.rowCol.row_id = B.id"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream) .containsInAnyOrder( @@ -1578,9 +1477,7 @@ public void testZetaSQLStructFieldAccessInnerJoin() { public void testZetaSQLSelectFromTableWithArrayType() { String sql = "SELECT array_col FROM table_with_array;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addArrayField("field", FieldType.STRING).build(); @@ -1596,9 +1493,7 @@ public void testZetaSQLSelectFromTableWithArrayType() { public void testZetaSQLSelectStarFromTable() { String sql = "SELECT * FROM BigTable;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -1629,9 +1524,7 @@ public void testZetaSQLSelectStarFromTable() { public void testZetaSQLBasicFiltering() { String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( Row.withSchema( @@ -1646,9 +1539,7 @@ public void testZetaSQLBasicFiltering() { public void testZetaSQLBasicFilteringTwo() { String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 AND Value = 'non-existing';"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream).containsInAnyOrder(); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -1658,9 +1549,7 @@ public void testZetaSQLBasicFilteringTwo() { public void testZetaSQLBasicFilteringThree() { String sql = "SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); @@ -1676,9 +1565,7 @@ public void testZetaSQLBasicFilteringThree() { public void testZetaSQLCountOnAColumn() { String sql = "SELECT COUNT(Key) FROM KeyValue"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); @@ -1700,9 +1587,7 @@ public void testZetaSQLAggDistinct() { public void testZetaSQLBasicAgg() { String sql = "SELECT Key, COUNT(*) FROM KeyValue GROUP BY Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); @@ -1718,9 +1603,7 @@ public void testZetaSQLBasicAgg() { public void testZetaSQLColumnAlias1() { String sql = "SELECT Key, COUNT(*) AS count_col FROM KeyValue GROUP BY Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); Schema outputSchema = stream.getSchema(); @@ -1735,9 +1618,7 @@ public void testZetaSQLColumnAlias2() { "SELECT Key AS k1, (count_col + 1) AS k2 FROM (SELECT Key, COUNT(*) AS count_col FROM" + " KeyValue GROUP BY Key)"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); Schema outputSchema = stream.getSchema(); @@ -1750,9 +1631,7 @@ public void testZetaSQLColumnAlias2() { public void testZetaSQLColumnAlias3() { String sql = "SELECT Key AS v1, Value AS v2, ts AS v3 FROM KeyValue"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); Schema outputSchema = stream.getSchema(); @@ -1766,9 +1645,7 @@ public void testZetaSQLColumnAlias3() { public void testZetaSQLColumnAlias4() { String sql = "SELECT CAST(123 AS INT64) AS cast_col"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); Schema outputSchema = stream.getSchema(); @@ -1792,9 +1669,7 @@ public void testZetaSQLAmbiguousAlias() { public void testZetaSQLAggWithOrdinalReference() { String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY 1"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); @@ -1811,9 +1686,7 @@ public void testZetaSQLAggWithOrdinalReference() { public void testZetaSQLAggWithAliasReference() { String sql = "SELECT Key AS K, COUNT(*) FROM aggregate_test_table GROUP BY K"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); @@ -1830,9 +1703,7 @@ public void testZetaSQLAggWithAliasReference() { public void testZetaSQLBasicAgg2() { String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); @@ -1849,9 +1720,7 @@ public void testZetaSQLBasicAgg2() { public void testZetaSQLBasicAgg3() { String sql = "SELECT Key, Key2, COUNT(*) FROM aggregate_test_table GROUP BY Key2, Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -1877,9 +1746,7 @@ public void testZetaSQLBasicAgg4() { "SELECT Key, Key2, MAX(f_int_1), MIN(f_int_1), SUM(f_int_1), SUM(f_double_1) " + "FROM aggregate_test_table GROUP BY Key2, Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -1908,9 +1775,7 @@ public void testZetaSQLBasicAgg5() { "SELECT Key, Key2, AVG(CAST(f_int_1 AS FLOAT64)), AVG(f_double_1) " + "FROM aggregate_test_table GROUP BY Key2, Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -1938,9 +1803,7 @@ public void testZetaSQLBasicAgg5() { public void testZetaSQLTestAVG() { String sql = "SELECT Key, AVG(f_int_1)" + "FROM aggregate_test_table GROUP BY Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -1961,9 +1824,7 @@ public void testZetaSQLTestAVG() { @Test public void testZetaSQLGroupByExprInSelect() { String sql = "SELECT int64_col + 1 FROM table_all_types GROUP BY int64_col + 1;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field").build(); @@ -1981,9 +1842,7 @@ public void testZetaSQLGroupByExprInSelect() { @Test public void testZetaSQLGroupByAndFiltering() { String sql = "SELECT int64_col FROM table_all_types WHERE int64_col = 1 GROUP BY int64_col;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream).containsInAnyOrder(); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } @@ -1991,9 +1850,7 @@ public void testZetaSQLGroupByAndFiltering() { @Test public void testZetaSQLGroupByAndFilteringOnNonGroupByColumn() { String sql = "SELECT int64_col FROM table_all_types WHERE double_col = 0.5 GROUP BY int64_col;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field").build(); PAssert.that(stream) .containsInAnyOrder( @@ -2006,9 +1863,7 @@ public void testZetaSQLGroupByAndFilteringOnNonGroupByColumn() { public void testZetaSQLBasicHaving() { String sql = "SELECT Key, COUNT(*) FROM aggregate_test_table GROUP BY Key HAVING COUNT(*) > 2"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); @@ -2021,9 +1876,7 @@ public void testZetaSQLBasicHaving() { public void testZetaSQLHavingNull() { String sql = "SELECT SUM(int64_val) FROM all_null_table GROUP BY primary_key HAVING false"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field").build(); @@ -2076,9 +1929,7 @@ public void testZetaSQLNestedQueryOne() { "SELECT a.Value, a.Key FROM (SELECT Key, Value FROM KeyValue WHERE Key = 14 OR Key = 15)" + " as a;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build(); @@ -2098,9 +1949,7 @@ public void testZetaSQLNestedQueryTwo() { + " (SELECT * FROM aggregate_test_table WHERE Key != 10) as a " + " GROUP BY a.Key2, a.Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -2127,9 +1976,7 @@ public void testZetaSQLNestedQueryThree() { "SELECT * FROM (SELECT * FROM KeyValue) AS t1 INNER JOIN (SELECT * FROM BigTable) AS t2 on" + " t1.Key = t2.RowKey"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -2160,9 +2007,7 @@ public void testZetaSQLNestedQueryFive() { "SELECT a.Value, a.Key FROM (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15)" + " as a;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build(); @@ -2186,9 +2031,7 @@ public void testMultipleSelectStatementsThrowsException() { @Test public void testDistinct() { String sql = "SELECT DISTINCT Key2 FROM aggregate_test_table"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addInt64Field("Key2").build(); PAssert.that(stream) @@ -2203,9 +2046,7 @@ public void testDistinct() { @Test public void testDistinctOnNull() { String sql = "SELECT DISTINCT str_val FROM all_null_table"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addNullableField("str_val", FieldType.DOUBLE).build(); PAssert.that(stream) @@ -2216,9 +2057,7 @@ public void testDistinctOnNull() { @Test public void testAnyValue() { String sql = "SELECT ANY_VALUE(double_val) FROM all_null_table"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addNullableField("double_val", FieldType.DOUBLE).build(); PAssert.that(stream) @@ -2229,9 +2068,7 @@ public void testAnyValue() { @Test public void testSelectNULL() { String sql = "SELECT NULL"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addNullableField("long_val", FieldType.INT64).build(); PAssert.that(stream) @@ -2244,9 +2081,7 @@ public void testWithQueryOne() { String sql = "With T1 AS (SELECT * FROM KeyValue), T2 AS (SELECT * FROM BigTable) SELECT T2.RowKey FROM" + " T1 INNER JOIN T2 on T1.Key = T2.RowKey;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( Row.withSchema(Schema.builder().addInt64Field("field1").build()) @@ -2261,9 +2096,7 @@ public void testWithQueryTwo() { "WITH T1 AS (SELECT Key, COUNT(*) as value FROM KeyValue GROUP BY Key) SELECT T1.Key," + " T1.value FROM T1"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); @@ -2281,9 +2114,7 @@ public void testWithQueryThree() { "WITH T1 as (SELECT Value, Key FROM KeyValue WHERE Key = 14 OR Key = 15) SELECT T1.Value," + " T1.Key FROM T1;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").addInt64Field("field2").build(); @@ -2301,9 +2132,7 @@ public void testWithQueryFour() { "WITH T1 as (SELECT Value, Key FROM KeyValue) SELECT T1.Value, T1.Key FROM T1 WHERE T1.Key" + " = 14 OR T1.Key = 15;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field2").addInt64Field("field1").build(); @@ -2320,9 +2149,7 @@ public void testWithQueryFive() { String sql = "WITH T1 AS (SELECT * FROM KeyValue) SELECT T1.Key, COUNT(*) FROM T1 GROUP BY T1.Key"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build(); @@ -2505,9 +2332,7 @@ public void testUnnestArrayColumn() { String sql = "SELECT p FROM table_with_array_for_unnest, UNNEST(table_with_array_for_unnest.int_array_col) as p"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addInt64Field("int_field").build(); PAssert.that(stream) @@ -2524,9 +2349,7 @@ public void testUnnestArrayColumn() { public void testUnnestArrayOfStructColumn() { String sql = "SELECT int_col, data FROM table_with_array_of_struct, UNNEST(array_col) AS s"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addInt64Field("int_col").addStringField("data").build(); PAssert.that(stream) @@ -2542,9 +2365,7 @@ public void testUnnestArrayOfStructColumn() { public void testUnnestArrayOfStructLiteral() { String sql = "SELECT a, b FROM UNNEST([STRUCT(1 AS a, '1' AS b), STRUCT(2, '2')])"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addInt64Field("a").addStringField("b").build(); PAssert.that(stream) @@ -2555,14 +2376,45 @@ public void testUnnestArrayOfStructLiteral() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test + public void testStructOfStructPassthrough() { + String sql = "SELECT * FROM table_with_struct_of_struct"; + + PCollection stream = execute(sql); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(TestInput.STRUCT_OF_STRUCT) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), + Row.withSchema(TestInput.STRUCT_OF_STRUCT) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testStructOfStructSimpleRename() { + String sql = "SELECT row as not_row FROM table_with_struct_of_struct"; + + PCollection stream = execute(sql); + + Schema schema = Schema.builder().addRowField("not_row", TestInput.STRUCT_SCHEMA).build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), + Row.withSchema(schema) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + @Test public void testUnnestStructOfStructOfArray() { String sql = "SELECT int_col, s FROM table_with_struct_of_struct_of_array, UNNEST(struct_col.struct.arr) as s"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addInt64Field("int_col").addStringField("p").build(); PAssert.that(stream) @@ -2578,9 +2430,7 @@ public void testUnnestStructOfStructOfArray() { public void testUnnestArrayOfStructOfStructColumn() { String sql = "SELECT s.row FROM table_with_array_of_struct_of_struct, UNNEST(array_col) as s"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addRowField("row", TestInput.STRUCT_SCHEMA).build(); PAssert.that(stream) @@ -2600,9 +2450,7 @@ public void testUnnestArrayOfStructOfStructLiteral() { String sql = "SELECT s.row FROM UNNEST([STRUCT(STRUCT(1, '1') as row), STRUCT(STRUCT(2, '2'))]) as s"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addRowField("row", TestInput.STRUCT_SCHEMA).build(); PAssert.that(stream) @@ -2622,9 +2470,7 @@ public void testUnnestStructOfArrayOfStructColumn() { String sql = "SELECT int_col, data FROM table_with_struct_of_array_of_struct, UNNEST(struct_col.arr) as s"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addInt64Field("int_col").addStringField("p").build(); PAssert.that(stream) @@ -2641,9 +2487,7 @@ public void testStringAggregation() { String sql = "SELECT STRING_AGG(fruit) AS string_agg" + " FROM UNNEST([\"apple\", \"pear\", \"banana\", \"pear\"]) AS fruit"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addStringField("string_field").build(); PAssert.that(stream) @@ -2662,9 +2506,7 @@ public void testNamedUNNESTJoin() { + " on " + " t1.int_col = t2"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream).containsInAnyOrder(); @@ -2713,9 +2555,7 @@ public void testUnnestJoinSubquery() { public void testCaseNoValue() { String sql = "SELECT CASE WHEN 1 > 2 THEN 'not possible' ELSE 'seems right' END"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -2730,9 +2570,7 @@ public void testCaseNoValue() { public void testCaseWithValue() { String sql = "SELECT CASE 1 WHEN 2 THEN 'not possible' ELSE 'seems right' END"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -2749,9 +2587,7 @@ public void testCaseWithValueMultipleCases() { "SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' ELSE 'also not" + " possible' END"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -2766,9 +2602,7 @@ public void testCaseWithValueMultipleCases() { public void testCaseWithValueNoElse() { String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' WHEN 2 THEN 'seems right' END"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -2783,9 +2617,7 @@ public void testCaseWithValueNoElse() { public void testCaseNoValueNoElseNoMatch() { String sql = "SELECT CASE WHEN 'abc' = '123' THEN 'not possible' END"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -2800,9 +2632,7 @@ public void testCaseNoValueNoElseNoMatch() { public void testCaseWithValueNoElseNoMatch() { String sql = "SELECT CASE 2 WHEN 1 THEN 'not possible' END"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -2828,9 +2658,7 @@ public void testCastToDateWithCase() { + "END \n" + "FROM table_for_case_when"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema resultType = Schema.builder() @@ -2851,9 +2679,7 @@ public void testIntersectAll() { + "INTERSECT ALL " + "SELECT Key FROM aggregate_test_table_two"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema resultType = Schema.builder().addInt64Field("field").build(); @@ -2874,9 +2700,7 @@ public void testIntersectDistinct() { + "INTERSECT DISTINCT " + "SELECT Key FROM aggregate_test_table_two"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema resultType = Schema.builder().addInt64Field("field").build(); @@ -2895,9 +2719,7 @@ public void testExceptAll() { + "EXCEPT ALL " + "SELECT Key FROM aggregate_test_table_two"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema resultType = Schema.builder().addInt64Field("field").build(); @@ -2912,9 +2734,7 @@ public void testExceptAll() { public void testSelectNullIntersectDistinct() { String sql = "SELECT NULL INTERSECT DISTINCT SELECT 2"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); System.err.println("SCHEMA " + stream.getSchema()); PAssert.that(stream).empty(); @@ -2925,9 +2745,7 @@ public void testSelectNullIntersectDistinct() { public void testSelectNullIntersectAll() { String sql = "SELECT NULL INTERSECT ALL SELECT 2"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); System.err.println("SCHEMA " + stream.getSchema()); PAssert.that(stream).empty(); @@ -2938,9 +2756,7 @@ public void testSelectNullIntersectAll() { public void testSelectNullExceptDistinct() { String sql = "SELECT NULL EXCEPT DISTINCT SELECT 2"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema())); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -2950,9 +2766,7 @@ public void testSelectNullExceptDistinct() { public void testSelectNullExceptAll() { String sql = "SELECT NULL EXCEPT ALL SELECT 2"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema())); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -2961,9 +2775,7 @@ public void testSelectNullExceptAll() { @Test public void testSelectFromEmptyTable() { String sql = "SELECT * FROM table_empty;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream).containsInAnyOrder(); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } @@ -2971,9 +2783,7 @@ public void testSelectFromEmptyTable() { @Test public void testStartsWithString() { String sql = "SELECT STARTS_WITH('string1', 'stri')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(true).build()); @@ -2990,9 +2800,7 @@ public void testStartsWithString2() { .put("p1", Value.createStringValue("")) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); PAssert.that(stream) @@ -3010,9 +2818,7 @@ public void testStartsWithString3() { .put("p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); PAssert.that(stream) @@ -3023,9 +2829,7 @@ public void testStartsWithString3() { @Test public void testEndsWithString() { String sql = "SELECT STARTS_WITH('string1', 'ng0')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build()); @@ -3042,9 +2846,7 @@ public void testEndsWithString2() { .put("p1", Value.createStringValue("")) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); PAssert.that(stream) @@ -3062,9 +2864,7 @@ public void testEndsWithString3() { .put("p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)) .build(); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build(); PAssert.that(stream) @@ -3075,9 +2875,7 @@ public void testEndsWithString3() { @Test public void testConcatWithOneParameters() { String sql = "SELECT concat('abc')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -3086,9 +2884,7 @@ public void testConcatWithOneParameters() { @Test public void testConcatWithTwoParameters() { String sql = "SELECT concat('abc', 'def')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abcdef").build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -3097,9 +2893,7 @@ public void testConcatWithTwoParameters() { @Test public void testConcatWithThreeParameters() { String sql = "SELECT concat('abc', 'def', 'xyz')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abcdefxyz").build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -3108,9 +2902,7 @@ public void testConcatWithThreeParameters() { @Test public void testConcatWithFourParameters() { String sql = "SELECT concat('abc', 'def', ' ', 'xyz')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream) .containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyz").build()); @@ -3120,9 +2912,7 @@ public void testConcatWithFourParameters() { @Test public void testConcatWithFiveParameters() { String sql = "SELECT concat('abc', 'def', ' ', 'xyz', 'kkk')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream) .containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyzkkk").build()); @@ -3132,9 +2922,7 @@ public void testConcatWithFiveParameters() { @Test public void testConcatWithSixParameters() { String sql = "SELECT concat('abc', 'def', ' ', 'xyz', 'kkk', 'ttt')"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream) .containsInAnyOrder(Row.withSchema(schema).addValues("abcdef xyzkkkttt").build()); @@ -3150,9 +2938,7 @@ public void testConcatWithNull1() { Value.createStringValue(""), "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); PAssert.that(stream) .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); @@ -3168,9 +2954,7 @@ public void testConcatWithNull2() { Value.createSimpleNullValue(TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); PAssert.that(stream) .containsInAnyOrder(Row.withSchema(schema).addValues((String) null).build()); @@ -3182,9 +2966,7 @@ public void testNamedParameterQuery() { String sql = "SELECT @ColA AS ColA"; ImmutableMap params = ImmutableMap.of("ColA", Value.createInt64Value(5)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addInt64Field("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(5L).build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -3226,9 +3008,7 @@ public void testParameterStruct() { "i", TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))), ImmutableList.of(Value.createStringValue("foo"), Value.createInt64Value(1L)))); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema innerSchema = Schema.of(Field.of("s", FieldType.STRING), Field.of("i", FieldType.INT64)); @@ -3261,9 +3041,7 @@ public void testParameterStructNested() { Value.createStructValue( innerStructType, ImmutableList.of(Value.createStringValue("foo")))))); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3278,9 +3056,7 @@ public void testConcatNamedParameterQuery() { ImmutableMap params = ImmutableMap.of("p0", Value.createStringValue(""), "p1", Value.createStringValue("A")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("A").build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -3295,9 +3071,7 @@ public void testConcatPositionalParameterQuery() { Value.createStringValue("b"), Value.createStringValue("c")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build()); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); @@ -3312,9 +3086,7 @@ public void testReplace1() { "p1", Value.createStringValue(""), "p2", Value.createStringValue("a")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3331,9 +3103,7 @@ public void testReplace2() { "p1", Value.createStringValue(""), "p2", Value.createStringValue("xyz")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3350,9 +3120,7 @@ public void testReplace3() { "p1", Value.createStringValue(""), "p2", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -3370,9 +3138,7 @@ public void testReplace4() { "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING), "p2", Value.createStringValue("")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -3387,9 +3153,7 @@ public void testTrim1() { ImmutableMap params = ImmutableMap.of("p0", Value.createStringValue(" a b c ")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3404,9 +3168,7 @@ public void testTrim2() { ImmutableMap.of( "p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3422,9 +3184,7 @@ public void testTrim3() { "p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -3439,9 +3199,7 @@ public void testLTrim1() { ImmutableMap params = ImmutableMap.of("p0", Value.createStringValue(" a b c ")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3456,9 +3214,7 @@ public void testLTrim2() { ImmutableMap.of( "p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3474,9 +3230,7 @@ public void testLTrim3() { "p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -3491,9 +3245,7 @@ public void testRTrim1() { ImmutableMap params = ImmutableMap.of("p0", Value.createStringValue(" a b c ")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3508,9 +3260,7 @@ public void testRTrim2() { ImmutableMap.of( "p0", Value.createStringValue("abxyzab"), "p1", Value.createStringValue("ab")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3526,9 +3276,7 @@ public void testRTrim3() { "p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING), "p1", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field1", FieldType.STRING).build(); @@ -3543,9 +3291,7 @@ public void testCastBytesToString1() { String sql = "SELECT CAST(@p0 AS STRING)"; ImmutableMap params = ImmutableMap.of("p0", Value.createBytesValue(ByteString.copyFromUtf8("`"))); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3556,9 +3302,7 @@ public void testCastBytesToString1() { @Test public void testCastBytesToString2() { String sql = "SELECT CAST(b'b' AS STRING)"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3570,9 +3314,7 @@ public void testCastBytesToString2() { @Ignore("https://jira.apache.org/jira/browse/BEAM-9191") public void testCastBytesToStringFromTable() { String sql = "SELECT CAST(bytes_col AS STRING) FROM table_all_types"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3589,9 +3331,7 @@ public void testCastBytesToStringFromTable() { @Test public void testCastStringToTimestamp() { String sql = "SELECT CAST('2019-01-15 13:21:03' AS TIMESTAMP)"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addDateTimeField("field_1").build(); @@ -3633,9 +3373,7 @@ public void testCastBetweenTimeAndString() { "SELECT CAST(s1 as TIME) as t2, CAST(t1 as STRING) as s2 FROM " + "(SELECT '12:34:56.123456' as s1, TIME '12:34:56.123456' as t1)"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); PAssert.that(stream) .containsInAnyOrder( @@ -3653,9 +3391,7 @@ public void testCastBetweenTimeAndString() { public void testCastStringToString() { String sql = "SELECT CAST(@p0 AS STRING)"; ImmutableMap params = ImmutableMap.of("p0", Value.createStringValue("")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3667,9 +3403,7 @@ public void testCastStringToString() { public void testCastStringToInt64() { String sql = "SELECT CAST(@p0 AS INT64)"; ImmutableMap params = ImmutableMap.of("p0", Value.createStringValue("123")); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addInt64Field("field1").build(); @@ -3680,9 +3414,7 @@ public void testCastStringToInt64() { @Test public void testSelectConstant() { String sql = "SELECT 'hi'"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field1").build(); @@ -3694,9 +3426,7 @@ public void testSelectConstant() { @Ignore("[BEAM-8593] ZetaSQL does not support Map type") public void testSelectFromTableWithMap() { String sql = "SELECT row_field FROM table_with_map"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema rowSchema = Schema.builder().addInt64Field("row_id").addStringField("data").build(); PAssert.that(stream) .containsInAnyOrder( @@ -3725,9 +3455,7 @@ public void testSubstr() { "p1", Value.createInt64Value(-2L), "p2", Value.createInt64Value(1L)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addStringField("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("b").build()); @@ -3754,9 +3482,7 @@ public void testSubstrWithLargeValueExpectException() { public void testSelectAll() { String sql = "SELECT ALL Key, Value FROM KeyValue;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").addStringField("field2").build(); @@ -3772,9 +3498,7 @@ public void testSelectAll() { public void testSelectDistinct() { String sql = "SELECT DISTINCT Key FROM aggregate_test_table;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); @@ -3795,9 +3519,7 @@ public void testSelectDistinct2() { + " select b\"bytes\" union all\n" + " select b\"ByTeS\") val"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addByteArrayField("field1").build(); @@ -3813,9 +3535,7 @@ public void testSelectDistinct2() { @Test public void testSelectBytes() { String sql = "SELECT b\"ByTes\""; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addByteArrayField("field1").build(); @@ -3830,9 +3550,7 @@ public void testSelectBytes() { public void testSelectExcept() { String sql = "SELECT * EXCEPT (Key, ts) FROM KeyValue;"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field2").build(); @@ -3854,9 +3572,7 @@ public void testSelectReplace() { + "SELECT * REPLACE (\"widget\" AS item_name)\n" + "FROM orders"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder() @@ -3876,9 +3592,7 @@ public void testUnionAllBasic() { String sql = "SELECT row_id FROM table_all_types UNION ALL SELECT row_id FROM table_all_types_2"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); @@ -3912,9 +3626,7 @@ public void testAVGWithLongInput() { public void testReverseString() { String sql = "SELECT REVERSE('abc');"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addStringField("field2").build(); @@ -3927,9 +3639,7 @@ public void testReverseString() { public void testCharLength() { String sql = "SELECT CHAR_LENGTH('abc');"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(3L).build()); @@ -3943,9 +3653,7 @@ public void testCharLengthNull() { ImmutableMap params = ImmutableMap.of("p0", Value.createSimpleNullValue(TypeKind.TYPE_STRING)); - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql, params); final Schema schema = Schema.builder().addNullableField("field", FieldType.INT64).build(); PAssert.that(stream) @@ -4037,9 +3745,7 @@ public void testIsNullTrueFalse() { public void testZetaSQLBitOr() { String sql = "SELECT BIT_OR(row_id) FROM table_all_types GROUP BY bool_col"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); PAssert.that(stream) @@ -4055,9 +3761,7 @@ public void testZetaSQLBitOr() { public void testZetaSQLBitAnd() { String sql = "SELECT BIT_AND(row_id) FROM table_all_types GROUP BY bool_col"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); PAssert.that(stream) @@ -4072,9 +3776,7 @@ public void testZetaSQLBitAnd() { public void testSimpleTableName() { String sql = "SELECT Key FROM KeyValue"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema singleField = Schema.builder().addInt64Field("field1").build(); PAssert.that(stream) @@ -4089,9 +3791,7 @@ public void testSimpleTableName() { "Null values are not handled properly, so BIT_XOR is temporarily removed from SupportedZetaSqlBuiltinFunctions. https://issues.apache.org/jira/browse/BEAM-10379") public void testZetaSQLBitXor() { String sql = "SELECT BIT_XOR(x) AS bit_xor FROM UNNEST([5678, 1234]) AS x"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addInt64Field("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(4860L).build()); @@ -4105,9 +3805,7 @@ public void testCountIfZetaSQLDialect() { "WITH is_positive AS ( SELECT x > 0 flag FROM UNNEST([5, -2, 3, 6, -10, -7, 4, 0]) AS x) " + "SELECT COUNTIF(flag) FROM is_positive"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); final Schema schema = Schema.builder().addInt64Field("field1").build(); PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValue(4L).build()); @@ -4119,9 +3817,7 @@ public void testCountIfZetaSQLDialect() { public void testArrayAggZetasql() { String sql = "SELECT ARRAY_AGG(x) AS array_agg " + "FROM UNNEST([1, 2, 3, 4, 5]) AS x"; - ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); - PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + PCollection stream = execute(sql); Schema schema = Schema.builder().addArrayField("array_field", FieldType.INT64).build(); PAssert.that(stream) diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java index 741aa0128805..043a7819b866 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java @@ -56,6 +56,7 @@ private TableProvider createBeamTableProvider() { testBoundedTableMap.put("table_with_array", TestInput.TABLE_WITH_ARRAY); testBoundedTableMap.put("table_with_array_for_unnest", TestInput.TABLE_WITH_ARRAY_FOR_UNNEST); testBoundedTableMap.put("table_with_array_of_struct", TestInput.TABLE_WITH_ARRAY_OF_STRUCT); + testBoundedTableMap.put("table_with_struct_of_struct", TestInput.TABLE_WITH_STRUCT_OF_STRUCT); testBoundedTableMap.put( "table_with_struct_of_struct_of_array", TestInput.TABLE_WITH_STRUCT_OF_STRUCT_OF_ARRAY); testBoundedTableMap.put(