Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,48 @@ public static void setup() {

@ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();

@Test
public void testMultiInputBroadcast() throws Exception {
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.executeSql("create database db1");
try {
tableEnv.useDatabase("db1");
tableEnv.executeSql("create table src1(key string, val string)");
tableEnv.executeSql("create table src2(key string, val string)");
tableEnv.executeSql("create table dest(key string, val string)");
HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1")
.addRow(new Object[] {"1", "val1"})
.addRow(new Object[] {"2", "val2"})
.addRow(new Object[] {"3", "val3"})
.commit();
HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src2")
.addRow(new Object[] {"3", "val4"})
.addRow(new Object[] {"4", "val4"})
.commit();
tableEnv.executeSql(
"INSERT OVERWRITE dest\n"
+ "SELECT j.*\n"
+ "FROM (SELECT t1.key, p1.val\n"
+ " FROM src2 t1\n"
+ " LEFT OUTER JOIN src1 p1\n"
+ " ON (t1.key = p1.key)\n"
+ " UNION ALL\n"
+ " SELECT t2.key, p2.val\n"
+ " FROM src2 t2\n"
+ " LEFT OUTER JOIN src1 p2\n"
+ " ON (t2.key = p2.key)) j")
.await();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check the result ?

List<Row> results =
CollectionUtil.iteratorToList(
tableEnv.executeSql("select * from dest order by key").collect());
assertEquals(
"[+I[3, val3], +I[3, val3], +I[4, null], +I[4, null]]", results.toString());
} finally {
tableEnv.useDatabase("default");
tableEnv.executeSql("drop database db1 cascade");
}
}

@Test
public void testDefaultPartitionName() throws Exception {
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,13 @@ private void createAllOperators(StreamOperatorParameters<RowData> parameters) {
if (outputs.length == 1) {
output = outputs[0];
} else {
// This is the inverse of creating the normal Output.
// In case of object reuse, we need to copy in the broadcast output.
// Because user's operator may change the record passed to it.
if (isObjectReuseEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add some comments for this behavior, just like the comments in OperatorChain#createOutputCollector method

output = new BroadcastingOutput(outputs);
} else {
output = new CopyingBroadcastingOutput(outputs);
} else {
output = new BroadcastingOutput(outputs);
}
}
}
Expand Down