diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java index 13e319a0c497b..9af50b44ddf4e 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java @@ -73,6 +73,48 @@ public static void setup() { @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + @Test + public void testMultiInputBroadcast() throws Exception { + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); + try { + tableEnv.useDatabase("db1"); + tableEnv.executeSql("create table src1(key string, val string)"); + tableEnv.executeSql("create table src2(key string, val string)"); + tableEnv.executeSql("create table dest(key string, val string)"); + HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1") + .addRow(new Object[] {"1", "val1"}) + .addRow(new Object[] {"2", "val2"}) + .addRow(new Object[] {"3", "val3"}) + .commit(); + HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src2") + .addRow(new Object[] {"3", "val4"}) + .addRow(new Object[] {"4", "val4"}) + .commit(); + tableEnv.executeSql( + "INSERT OVERWRITE dest\n" + + "SELECT j.*\n" + + "FROM (SELECT t1.key, p1.val\n" + + " FROM src2 t1\n" + + " LEFT OUTER JOIN src1 p1\n" + + " ON (t1.key = p1.key)\n" + + " UNION ALL\n" + + " SELECT t2.key, p2.val\n" + + " FROM src2 t2\n" + + " LEFT OUTER JOIN src1 p2\n" + + " ON (t2.key = p2.key)) j") + .await(); + List results = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from dest order by key").collect()); + assertEquals( + "[+I[3, val3], +I[3, val3], +I[4, null], +I[4, null]]", results.toString()); + } finally { + tableEnv.useDatabase("default"); + tableEnv.executeSql("drop database db1 cascade"); + } + } + @Test public void testDefaultPartitionName() throws Exception { TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java index 6a613be8f5c13..32863f8a35945 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java @@ -244,10 +244,13 @@ private void createAllOperators(StreamOperatorParameters parameters) { if (outputs.length == 1) { output = outputs[0]; } else { + // This is the inverse of creating the normal Output. + // In case of object reuse, we need to copy in the broadcast output. + // Because user's operator may change the record passed to it. if (isObjectReuseEnabled) { - output = new BroadcastingOutput(outputs); - } else { output = new CopyingBroadcastingOutput(outputs); + } else { + output = new BroadcastingOutput(outputs); } } }