From 9b4ecca5fca37a102ae9baa002684905a70bbc08 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 24 Dec 2020 11:39:35 +0800 Subject: [PATCH 1/3] [FLINK-20722][hive] HiveTableSink should copy the record when converting RowData to Row --- .../flink/connectors/hive/HiveTableSink.java | 30 ++++++++++++++- .../hive/TableEnvHiveConnectorITCase.java | 37 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 9bc7838d0bb94..0a48aa3a5e5b5 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory; import org.apache.flink.connectors.hive.util.HiveConfUtils; @@ -37,6 +38,9 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -230,8 +234,14 @@ private DataStreamSink createBatchSink( builder.setTempPath( new org.apache.flink.core.fs.Path(toStagingDir(sd.getLocation(), jobConf))); builder.setOutputFileConfig(fileNaming); + CopyingRowConverter toRowConverter = + new CopyingRowConverter(rowData -> (Row) converter.toExternal(rowData)); return dataStream - .map((MapFunction) value -> (Row) converter.toExternal(value)) + .transform( + CopyingRowConverter.class.getSimpleName(), + TypeExtractor.getMapReturnTypes( + toRowConverter.getUserFunction(), dataStream.getType()), + toRowConverter) .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } @@ -501,4 +511,22 @@ public boolean shouldRollOnProcessingTime( } } } + + /** An operator to convert RowData to Row and copies the input StreamRecord. */ + private static class CopyingRowConverter + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + public CopyingRowConverter(MapFunction userFunction) { + super(userFunction); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + StreamRecord copy = element.copy(element.getValue()); + output.collect(copy.replace(userFunction.map(copy.getValue()))); + } + } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java index 13e319a0c497b..4bfe12d36d1dd 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java @@ -73,6 +73,43 @@ public static void setup() { @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + @Test + public void testMultiInputBroadcast() throws Exception { + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + tableEnv.executeSql("create database db1"); + try { + tableEnv.useDatabase("db1"); + tableEnv.executeSql("create table src1(key string, val string)"); + tableEnv.executeSql("create table src2(key string, val string)"); + tableEnv.executeSql("create table dest(key string, val string)"); + HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1") + .addRow(new Object[] {"1", "val1"}) + .addRow(new Object[] {"2", "val2"}) + .addRow(new Object[] {"3", "val3"}) + .commit(); + HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src2") + .addRow(new Object[] {"3", "val4"}) + .addRow(new Object[] {"4", "val4"}) + .commit(); + tableEnv.executeSql( + "INSERT OVERWRITE dest\n" + + "SELECT j.*\n" + + "FROM (SELECT t1.key, p1.val\n" + + " FROM src2 t1\n" + + " LEFT OUTER JOIN src1 p1\n" + + " ON (t1.key = p1.key)\n" + + " UNION ALL\n" + + " SELECT t2.key, p2.val\n" + + " FROM src2 t2\n" + + " LEFT OUTER JOIN src1 p2\n" + + " ON (t2.key = p2.key)) j") + .await(); + } finally { + tableEnv.useDatabase("default"); + tableEnv.executeSql("drop database db1 cascade"); + } + } + @Test public void testDefaultPartitionName() throws Exception { TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); From 8caec3d0866bf6a67f713c9d37bfd945385e7a13 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 15 Apr 2021 10:59:34 +0800 Subject: [PATCH 2/3] use CopyingBroadcastingOutput when object reuse is enabled --- .../flink/connectors/hive/HiveTableSink.java | 30 +------------------ .../MultipleInputStreamOperatorBase.java | 4 +-- 2 files changed, 3 insertions(+), 31 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 0a48aa3a5e5b5..9bc7838d0bb94 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.BulkWriter; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory; import org.apache.flink.connectors.hive.util.HiveConfUtils; @@ -38,9 +37,6 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -234,14 +230,8 @@ private DataStreamSink createBatchSink( builder.setTempPath( new org.apache.flink.core.fs.Path(toStagingDir(sd.getLocation(), jobConf))); builder.setOutputFileConfig(fileNaming); - CopyingRowConverter toRowConverter = - new CopyingRowConverter(rowData -> (Row) converter.toExternal(rowData)); return dataStream - .transform( - CopyingRowConverter.class.getSimpleName(), - TypeExtractor.getMapReturnTypes( - toRowConverter.getUserFunction(), dataStream.getType()), - toRowConverter) + .map((MapFunction) value -> (Row) converter.toExternal(value)) .writeUsingOutputFormat(builder.build()) .setParallelism(parallelism); } @@ -511,22 +501,4 @@ public boolean shouldRollOnProcessingTime( } } } - - /** An operator to convert RowData to Row and copies the input StreamRecord. */ - private static class CopyingRowConverter - extends AbstractUdfStreamOperator> - implements OneInputStreamOperator { - - private static final long serialVersionUID = 1L; - - public CopyingRowConverter(MapFunction userFunction) { - super(userFunction); - } - - @Override - public void processElement(StreamRecord element) throws Exception { - StreamRecord copy = element.copy(element.getValue()); - output.collect(copy.replace(userFunction.map(copy.getValue()))); - } - } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java index 6a613be8f5c13..11ee48cd5879e 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java @@ -245,9 +245,9 @@ private void createAllOperators(StreamOperatorParameters parameters) { output = outputs[0]; } else { if (isObjectReuseEnabled) { - output = new BroadcastingOutput(outputs); - } else { output = new CopyingBroadcastingOutput(outputs); + } else { + output = new BroadcastingOutput(outputs); } } } From d95bcdeacb60d70108bcb9fe880ad212e9f8a62e Mon Sep 17 00:00:00 2001 From: Rui Li Date: Tue, 20 Apr 2021 14:45:37 +0800 Subject: [PATCH 3/3] address comments --- .../flink/connectors/hive/TableEnvHiveConnectorITCase.java | 5 +++++ .../multipleinput/MultipleInputStreamOperatorBase.java | 3 +++ 2 files changed, 8 insertions(+) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java index 4bfe12d36d1dd..9af50b44ddf4e 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java @@ -104,6 +104,11 @@ public void testMultiInputBroadcast() throws Exception { + " LEFT OUTER JOIN src1 p2\n" + " ON (t2.key = p2.key)) j") .await(); + List results = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from dest order by key").collect()); + assertEquals( + "[+I[3, val3], +I[3, val3], +I[4, null], +I[4, null]]", results.toString()); } finally { tableEnv.useDatabase("default"); tableEnv.executeSql("drop database db1 cascade"); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java index 11ee48cd5879e..32863f8a35945 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java @@ -244,6 +244,9 @@ private void createAllOperators(StreamOperatorParameters parameters) { if (outputs.length == 1) { output = outputs[0]; } else { + // This is the inverse of creating the normal Output. + // In case of object reuse, we need to copy in the broadcast output. + // Because user's operator may change the record passed to it. if (isObjectReuseEnabled) { output = new CopyingBroadcastingOutput(outputs); } else {