From 77f4bc11b30488c0e98847c6b51cb143f902ea91 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 24 Feb 2021 20:38:45 -0500 Subject: [PATCH 1/3] Tests which fail in various ways when querying nested structures --- .../sql/BeamSqlDslNestedRowsTest.java | 67 +++++++++++++++++++ .../sql/zetasql/ZetaSqlDialectSpecTest.java | 31 ++++++--- 2 files changed, 89 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java index db6cc1421b21..a727dd727360 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java @@ -76,6 +76,73 @@ public void testRowConstructorKeyword() { pipeline.run(); } + @Test + public void testRowAliasAsRow() { + Schema nestedSchema = + Schema.builder() + .addStringField("f_nestedString") + .addInt32Field("f_nestedInt") + .addInt32Field("f_nestedIntPlusOne") + .build(); + + Schema inputType = + Schema.builder().addInt32Field("f_int").addRowField("f_row", nestedSchema).build(); + Schema outputType = + Schema.builder().addInt32Field("f_int").addRowField("f_row1", nestedSchema).build(); + + PCollection input = + pipeline.apply( + Create.of( + Row.withSchema(inputType) + .attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) + .withRowSchema(inputType)); + + PCollection result = + input + .apply( + SqlTransform.query( + "SELECT 1 as `f_int`, f_row as `f_row1` FROM PCOLLECTION")).setRowSchema(outputType); + + PAssert.that(result) + .containsInAnyOrder(Row.withSchema(outputType).attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))); + + pipeline.run(); + } + + @Test + public void testRowConstructorKeywordKeepAsRow() { + Schema nestedSchema = + Schema.builder() + .addStringField("f_nestedString") + .addInt32Field("f_nestedInt") + .addInt32Field("f_nestedIntPlusOne") + .build(); + + Schema inputType = + Schema.builder().addInt32Field("f_int").addRowField("f_row", nestedSchema).build(); + Schema nestedOutput = Schema.builder().addInt32Field("int_field").addStringField("str_field").build(); + Schema outputType = + Schema.builder().addInt32Field("f_int1").addRowField("f_row1", nestedOutput).build(); + + PCollection input = + pipeline.apply( + Create.of( + Row.withSchema(inputType) + .attachValues(2, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) + .withRowSchema(inputType)); + + PCollection result = + input + .apply( + SqlTransform.query( + "SELECT f_int as `f_int1`, (`PCOLLECTION`.`f_row`.`f_nestedInt`, `PCOLLECTION`.`f_row`.`f_nestedString`) as `f_row1` FROM PCOLLECTION")).setRowSchema(outputType); + + PAssert.that(result) + .containsInAnyOrder(Row.withSchema(nestedSchema).attachValues(2, Row.withSchema(nestedOutput).attachValues(312, "CC"))); + + pipeline.run(); + } + @Test public void testRowConstructorBraces() { diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java index 9f4d1b90bb35..71bdd54303f3 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java @@ -2384,10 +2384,8 @@ public void testStructOfStructPassthrough() { PAssert.that(stream) .containsInAnyOrder( - Row.withSchema(TestInput.STRUCT_OF_STRUCT) - .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), - Row.withSchema(TestInput.STRUCT_OF_STRUCT) - .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); + Row.withSchema(TestInput.STRUCT_OF_STRUCT).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), + Row.withSchema(TestInput.STRUCT_OF_STRUCT).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } @@ -2398,13 +2396,28 @@ public void testStructOfStructSimpleRename() { PCollection stream = execute(sql); - Schema schema = Schema.builder().addRowField("not_row", TestInput.STRUCT_SCHEMA).build(); + Schema schema = + Schema.builder().addRowField("not_row", TestInput.STRUCT_SCHEMA).build(); PAssert.that(stream) .containsInAnyOrder( - Row.withSchema(schema) - .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), - Row.withSchema(schema) - .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); + Row.withSchema(schema).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), + Row.withSchema(schema).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testStructOfStructRemap() { + String sql = "SELECT STRUCT(row.row_id AS int_value_remapped) AS remapped FROM table_with_struct_of_struct"; + + PCollection stream = execute(sql); + + Schema nested = Schema.builder().addInt64Field("int_value_remapped").build(); + Schema schema = Schema.builder().addRowField("remapped", nested).build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema).attachValues(Row.withSchema(nested).attachValues(1L)), + Row.withSchema(schema).attachValues(Row.withSchema(nested).attachValues(2L))); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } From f5fe4b8d2414cad0daacf0b31c35079cd68a893d Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 25 Feb 2021 12:40:06 -0500 Subject: [PATCH 2/3] Add ignored --- .../sql/BeamSqlDslNestedRowsTest.java | 34 +++++++++++++------ .../sql/zetasql/ZetaSqlDialectSpecTest.java | 19 +++++++---- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java index a727dd727360..a8773927a6a6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -35,6 +36,8 @@ public class BeamSqlDslNestedRowsTest { @Rule public ExpectedException exceptions = ExpectedException.none(); @Test + @Ignore( + "[BEAM-9378] This is a test of the incorrect behavior that should not work but does because calcite flattens the row.") public void testRowConstructorKeyword() { Schema nestedSchema = Schema.builder() @@ -77,6 +80,7 @@ public void testRowConstructorKeyword() { } @Test + @Ignore("[BEAM-9378] This does not work because calcite flattens the row.") public void testRowAliasAsRow() { Schema nestedSchema = Schema.builder() @@ -93,23 +97,25 @@ public void testRowAliasAsRow() { PCollection input = pipeline.apply( Create.of( - Row.withSchema(inputType) - .attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) + Row.withSchema(inputType) + .attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) .withRowSchema(inputType)); PCollection result = input - .apply( - SqlTransform.query( - "SELECT 1 as `f_int`, f_row as `f_row1` FROM PCOLLECTION")).setRowSchema(outputType); + .apply(SqlTransform.query("SELECT 1 as `f_int`, f_row as `f_row1` FROM PCOLLECTION")) + .setRowSchema(outputType); PAssert.that(result) - .containsInAnyOrder(Row.withSchema(outputType).attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))); + .containsInAnyOrder( + Row.withSchema(outputType) + .attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))); pipeline.run(); } @Test + @Ignore("[BEAM-9378] This does not work because calcite flattens the row.") public void testRowConstructorKeywordKeepAsRow() { Schema nestedSchema = Schema.builder() @@ -120,30 +126,36 @@ public void testRowConstructorKeywordKeepAsRow() { Schema inputType = Schema.builder().addInt32Field("f_int").addRowField("f_row", nestedSchema).build(); - Schema nestedOutput = Schema.builder().addInt32Field("int_field").addStringField("str_field").build(); + Schema nestedOutput = + Schema.builder().addInt32Field("int_field").addStringField("str_field").build(); Schema outputType = Schema.builder().addInt32Field("f_int1").addRowField("f_row1", nestedOutput).build(); PCollection input = pipeline.apply( Create.of( - Row.withSchema(inputType) - .attachValues(2, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) + Row.withSchema(inputType) + .attachValues(2, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) .withRowSchema(inputType)); PCollection result = input .apply( SqlTransform.query( - "SELECT f_int as `f_int1`, (`PCOLLECTION`.`f_row`.`f_nestedInt`, `PCOLLECTION`.`f_row`.`f_nestedString`) as `f_row1` FROM PCOLLECTION")).setRowSchema(outputType); + "SELECT f_int as `f_int1`, (`PCOLLECTION`.`f_row`.`f_nestedInt`, `PCOLLECTION`.`f_row`.`f_nestedString`) as `f_row1` FROM PCOLLECTION")) + .setRowSchema(outputType); PAssert.that(result) - .containsInAnyOrder(Row.withSchema(nestedSchema).attachValues(2, Row.withSchema(nestedOutput).attachValues(312, "CC"))); + .containsInAnyOrder( + Row.withSchema(nestedSchema) + .attachValues(2, Row.withSchema(nestedOutput).attachValues(312, "CC"))); pipeline.run(); } @Test + @Ignore( + "[BEAM-9378] This is a test of the incorrect behavior that should not work but does because calcite flattens the row.") public void testRowConstructorBraces() { Schema nestedSchema = diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java index 71bdd54303f3..f46d7b61938c 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java @@ -2384,8 +2384,10 @@ public void testStructOfStructPassthrough() { PAssert.that(stream) .containsInAnyOrder( - Row.withSchema(TestInput.STRUCT_OF_STRUCT).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), - Row.withSchema(TestInput.STRUCT_OF_STRUCT).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); + Row.withSchema(TestInput.STRUCT_OF_STRUCT) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), + Row.withSchema(TestInput.STRUCT_OF_STRUCT) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } @@ -2396,19 +2398,22 @@ public void testStructOfStructSimpleRename() { PCollection stream = execute(sql); - Schema schema = - Schema.builder().addRowField("not_row", TestInput.STRUCT_SCHEMA).build(); + Schema schema = Schema.builder().addRowField("not_row", TestInput.STRUCT_SCHEMA).build(); PAssert.that(stream) .containsInAnyOrder( - Row.withSchema(schema).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), - Row.withSchema(schema).attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); + Row.withSchema(schema) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(1L, "1")), + Row.withSchema(schema) + .attachValues(Row.withSchema(TestInput.STRUCT_SCHEMA).attachValues(2L, "2"))); pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } @Test + @Ignore("[BEAM-9378] This should work, but is currently unimplemented.") public void testStructOfStructRemap() { - String sql = "SELECT STRUCT(row.row_id AS int_value_remapped) AS remapped FROM table_with_struct_of_struct"; + String sql = + "SELECT STRUCT(row.row_id AS int_value_remapped) AS remapped FROM table_with_struct_of_struct"; PCollection stream = execute(sql); From daf8c60f986718b40de02def7b3ea3ab2bd67bd7 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 25 Feb 2021 15:46:16 -0500 Subject: [PATCH 3/3] fix tests that shouldn't be ignored --- .../sdk/extensions/sql/BeamSqlDslNestedRowsTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java index a8773927a6a6..53ce03af16e0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java @@ -35,9 +35,11 @@ public class BeamSqlDslNestedRowsTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException exceptions = ExpectedException.none(); + /** + * TODO([BEAM-9378]): This is a test of the incorrect behavior that should not work but does + * because calcite flattens the row. + */ @Test - @Ignore( - "[BEAM-9378] This is a test of the incorrect behavior that should not work but does because calcite flattens the row.") public void testRowConstructorKeyword() { Schema nestedSchema = Schema.builder() @@ -153,9 +155,11 @@ public void testRowConstructorKeywordKeepAsRow() { pipeline.run(); } + /** + * TODO([BEAM-9378] This is a test of the incorrect behavior that should not work but does because + * calcite flattens the row. + */ @Test - @Ignore( - "[BEAM-9378] This is a test of the incorrect behavior that should not work but does because calcite flattens the row.") public void testRowConstructorBraces() { Schema nestedSchema =