Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -34,6 +35,10 @@ public class BeamSqlDslNestedRowsTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public ExpectedException exceptions = ExpectedException.none();

/**
* TODO([BEAM-9378]): This is a test of the incorrect behavior that should not work but does
* because calcite flattens the row.
*/
@Test
public void testRowConstructorKeyword() {
Schema nestedSchema =
Expand Down Expand Up @@ -76,6 +81,84 @@ public void testRowConstructorKeyword() {
pipeline.run();
}

@Test
@Ignore("[BEAM-9378] This does not work because calcite flattens the row.")
public void testRowAliasAsRow() {
Schema nestedSchema =
Schema.builder()
.addStringField("f_nestedString")
.addInt32Field("f_nestedInt")
.addInt32Field("f_nestedIntPlusOne")
.build();

Schema inputType =
Schema.builder().addInt32Field("f_int").addRowField("f_row", nestedSchema).build();
Schema outputType =
Schema.builder().addInt32Field("f_int").addRowField("f_row1", nestedSchema).build();

PCollection<Row> input =
pipeline.apply(
Create.of(
Row.withSchema(inputType)
.attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313)))
.withRowSchema(inputType));

PCollection<Row> result =
input
.apply(SqlTransform.query("SELECT 1 as `f_int`, f_row as `f_row1` FROM PCOLLECTION"))
.setRowSchema(outputType);

PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(outputType)
.attachValues(1, Row.withSchema(nestedSchema).attachValues("CC", 312, 313)));

pipeline.run();
}

@Test
@Ignore("[BEAM-9378] This does not work because calcite flattens the row.")
public void testRowConstructorKeywordKeepAsRow() {
Schema nestedSchema =
Schema.builder()
.addStringField("f_nestedString")
.addInt32Field("f_nestedInt")
.addInt32Field("f_nestedIntPlusOne")
.build();

Schema inputType =
Schema.builder().addInt32Field("f_int").addRowField("f_row", nestedSchema).build();
Schema nestedOutput =
Schema.builder().addInt32Field("int_field").addStringField("str_field").build();
Schema outputType =
Schema.builder().addInt32Field("f_int1").addRowField("f_row1", nestedOutput).build();

PCollection<Row> input =
pipeline.apply(
Create.of(
Row.withSchema(inputType)
.attachValues(2, Row.withSchema(nestedSchema).attachValues("CC", 312, 313)))
.withRowSchema(inputType));

PCollection<Row> result =
input
.apply(
SqlTransform.query(
"SELECT f_int as `f_int1`, (`PCOLLECTION`.`f_row`.`f_nestedInt`, `PCOLLECTION`.`f_row`.`f_nestedString`) as `f_row1` FROM PCOLLECTION"))
.setRowSchema(outputType);

PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(nestedSchema)
.attachValues(2, Row.withSchema(nestedOutput).attachValues(312, "CC")));

pipeline.run();
}

/**
* TODO([BEAM-9378] This is a test of the incorrect behavior that should not work but does because
* calcite flattens the row.
*/
@Test
public void testRowConstructorBraces() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2409,6 +2409,24 @@ public void testStructOfStructSimpleRename() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
@Ignore("[BEAM-9378] This should work, but is currently unimplemented.")
public void testStructOfStructRemap() {
String sql =
"SELECT STRUCT(row.row_id AS int_value_remapped) AS remapped FROM table_with_struct_of_struct";

PCollection<Row> stream = execute(sql);

Schema nested = Schema.builder().addInt64Field("int_value_remapped").build();
Schema schema = Schema.builder().addRowField("remapped", nested).build();
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(schema).attachValues(Row.withSchema(nested).attachValues(1L)),
Row.withSchema(schema).attachValues(Row.withSchema(nested).attachValues(2L)));

pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
public void testUnnestStructOfStructOfArray() {
String sql =
Expand Down