diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java index db6cc1421b21..53ce03af16e0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslNestedRowsTest.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -34,6 +35,10 @@ public class BeamSqlDslNestedRowsTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException exceptions = ExpectedException.none(); + /** + * TODO([BEAM-9378]): This is a test of the incorrect behavior that should not work but does + * because calcite flattens the row. + */ @Test public void testRowConstructorKeyword() { Schema nestedSchema = @@ -76,6 +81,84 @@ public void testRowConstructorKeyword() { pipeline.run(); } + @Test + @Ignore("[BEAM-9378] This does not work because calcite flattens the row.") + public void testRowAliasAsRow() { + Schema nestedSchema = + Schema.builder() + .addStringField("f_nestedString") + .addInt32Field("f_nestedInt") + .addInt32Field("f_nestedIntPlusOne") + .build(); + + Schema inputType = + Schema.builder().addInt32Field("f_int").addRowField("f_row", nestedSchema).build(); + Schema outputType = + Schema.builder().addInt32Field("f_int").addRowField("f_row1", nestedSchema).build(); + + PCollection input = + pipeline.apply( + Create.of( + Row.withSchema(inputType) + .attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) + .withRowSchema(inputType)); + + PCollection result = + input + .apply(SqlTransform.query("SELECT 1 as `f_int`, f_row as `f_row1` FROM PCOLLECTION")) + .setRowSchema(outputType); + + PAssert.that(result) + .containsInAnyOrder( + Row.withSchema(outputType) + .attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))); + + pipeline.run(); + } + + @Test + @Ignore("[BEAM-9378] This does not work because calcite flattens the row.") + public void testRowConstructorKeywordKeepAsRow() { + Schema nestedSchema = + Schema.builder() + .addStringField("f_nestedString") + .addInt32Field("f_nestedInt") + .addInt32Field("f_nestedIntPlusOne") + .build(); + + Schema inputType = + Schema.builder().addInt32Field("f_int").addRowField("f_row", nestedSchema).build(); + Schema nestedOutput = + Schema.builder().addInt32Field("int_field").addStringField("str_field").build(); + Schema outputType = + Schema.builder().addInt32Field("f_int1").addRowField("f_row1", nestedOutput).build(); + + PCollection input = + pipeline.apply( + Create.of( + Row.withSchema(inputType) + .attachValues(2, Row.withSchema(nestedSchema).attachValues("CC", 312, 313))) + .withRowSchema(inputType)); + + PCollection result = + input + .apply( + SqlTransform.query( + "SELECT f_int as `f_int1`, (`PCOLLECTION`.`f_row`.`f_nestedInt`, `PCOLLECTION`.`f_row`.`f_nestedString`) as `f_row1` FROM PCOLLECTION")) + .setRowSchema(outputType); + + PAssert.that(result) + .containsInAnyOrder( + Row.withSchema(nestedSchema) + .attachValues(2, Row.withSchema(nestedOutput).attachValues(312, "CC"))); + + pipeline.run(); + } + + /** + * TODO([BEAM-9378] This is a test of the incorrect behavior that should not work but does because + * calcite flattens the row. + */ @Test public void testRowConstructorBraces() { diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java index 9f4d1b90bb35..f46d7b61938c 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java @@ -2409,6 +2409,24 @@ public void testStructOfStructSimpleRename() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test + @Ignore("[BEAM-9378] This should work, but is currently unimplemented.") + public void testStructOfStructRemap() { + String sql = + "SELECT STRUCT(row.row_id AS int_value_remapped) AS remapped FROM table_with_struct_of_struct"; + + PCollection stream = execute(sql); + + Schema nested = Schema.builder().addInt64Field("int_value_remapped").build(); + Schema schema = Schema.builder().addRowField("remapped", nested).build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema).attachValues(Row.withSchema(nested).attachValues(1L)), + Row.withSchema(schema).attachValues(Row.withSchema(nested).attachValues(2L))); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + @Test public void testUnnestStructOfStructOfArray() { String sql =