From 980b08fff0f53d155dae142fc120132469630b8e Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Sun, 1 Dec 2024 10:09:10 +0800 Subject: [PATCH 1/2] [flink] Replace legacy SinkFunction with v2 Sink --- .../connector/sink2/WriterInitContext.java | 22 +++++ .../api/functions/sink/v2/DiscardingSink.java | 58 +++++++++++++ .../connector/sink2/WriterInitContext.java | 22 +++++ .../api/functions/sink/v2/DiscardingSink.java | 58 +++++++++++++ .../connector/sink2/WriterInitContext.java | 25 ++++++ .../api/functions/sink/v2/DiscardingSink.java | 59 +++++++++++++ .../connector/sink2/WriterInitContext.java | 22 +++++ .../api/functions/sink/v2/DiscardingSink.java | 59 +++++++++++++ .../sink/cdc/FlinkCdcMultiTableSink.java | 4 +- .../sink/cdc/FlinkCdcMultiTableSinkTest.java | 5 +- .../paimon/flink/action/CloneAction.java | 4 +- .../flink/service/QueryAddressRegister.java | 84 +++++++++++-------- .../paimon/flink/service/QueryService.java | 2 +- .../sink/CombinedTableCompactorSink.java | 4 +- .../apache/paimon/flink/sink/FlinkSink.java | 4 +- 15 files changed, 383 insertions(+), 49 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java create mode 100644 paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java create mode 100644 paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java create mode 100644 paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java create mode 100644 paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java new file mode 100644 index 000000000000..563dbbe75e7e --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +/** Placeholder class to resolve compatibility issues. */ +public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {} diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java new file mode 100644 index 000000000000..98aaf6418ff7 --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.v2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; + +import java.io.IOException; + +/** + * A special sink that ignores all elements. + * + * @param The type of elements received by the sink. + */ +@PublicEvolving +public class DiscardingSink implements Sink { + private static final long serialVersionUID = 1L; + + @Override + public SinkWriter createWriter(InitContext context) throws IOException { + return new DiscardingElementWriter(); + } + + private class DiscardingElementWriter implements SinkWriter { + + @Override + public void write(IN element, Context context) throws IOException, InterruptedException { + // discard it. + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + // this writer has no pending data. + } + + @Override + public void close() throws Exception { + // do nothing. + } + } +} diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java new file mode 100644 index 000000000000..563dbbe75e7e --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +/** Placeholder class to resolve compatibility issues. */ +public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {} diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java new file mode 100644 index 000000000000..98aaf6418ff7 --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.v2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; + +import java.io.IOException; + +/** + * A special sink that ignores all elements. + * + * @param The type of elements received by the sink. + */ +@PublicEvolving +public class DiscardingSink implements Sink { + private static final long serialVersionUID = 1L; + + @Override + public SinkWriter createWriter(InitContext context) throws IOException { + return new DiscardingElementWriter(); + } + + private class DiscardingElementWriter implements SinkWriter { + + @Override + public void write(IN element, Context context) throws IOException, InterruptedException { + // discard it. + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + // this writer has no pending data. + } + + @Override + public void close() throws Exception { + // do nothing. + } + } +} diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java new file mode 100644 index 000000000000..db4500042572 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.Public; + +/** Placeholder class to resolve compatibility issues. */ +@Public +public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {} diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java new file mode 100644 index 000000000000..fc7eb0d48356 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.v2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; + +import java.io.IOException; + +/** + * A special sink that ignores all elements. + * + * @param The type of elements received by the sink. + */ +@PublicEvolving +public class DiscardingSink implements Sink, SupportsConcurrentExecutionAttempts { + private static final long serialVersionUID = 1L; + + @Override + public SinkWriter createWriter(InitContext context) throws IOException { + return new DiscardingElementWriter(); + } + + private class DiscardingElementWriter implements SinkWriter { + + @Override + public void write(IN element, Context context) throws IOException, InterruptedException { + // discard it. + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + // this writer has no pending data. + } + + @Override + public void close() throws Exception { + // do nothing. + } + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java new file mode 100644 index 000000000000..563dbbe75e7e --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +/** Placeholder class to resolve compatibility issues. */ +public interface WriterInitContext extends org.apache.flink.api.connector.sink2.Sink.InitContext {} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java new file mode 100644 index 000000000000..fc7eb0d48356 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/streaming/api/functions/sink/v2/DiscardingSink.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.v2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; + +import java.io.IOException; + +/** + * A special sink that ignores all elements. + * + * @param The type of elements received by the sink. + */ +@PublicEvolving +public class DiscardingSink implements Sink, SupportsConcurrentExecutionAttempts { + private static final long serialVersionUID = 1L; + + @Override + public SinkWriter createWriter(InitContext context) throws IOException { + return new DiscardingElementWriter(); + } + + private class DiscardingElementWriter implements SinkWriter { + + @Override + public void write(IN element, Context context) throws IOException, InterruptedException { + // discard it. + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + // this writer has no pending data. + } + + @Override + public void close() throws Exception { + // do nothing. + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index f9b7bbc6b910..1688d4deb088 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -134,7 +134,7 @@ public DataStreamSink sinkFrom( createCommittableStateManager())) .setParallelism(input.getParallelism()); configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory); - return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); + return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); } protected OneInputStreamOperatorFactory diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java index e1bd112ca751..723f57a30e3f 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java @@ -22,11 +22,11 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.options.Options; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; -import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.junit.jupiter.api.Test; @@ -64,8 +64,7 @@ public void cancel() {} DataStreamSink dataStreamSink = sink.sinkFrom(input); // check the transformation graph - LegacySinkTransformation end = - (LegacySinkTransformation) dataStreamSink.getTransformation(); + Transformation end = dataStreamSink.getTransformation(); assertThat(end.getName()).isEqualTo("end"); OneInputTransformation committer = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java index 2f90147eeb2a..bac030dd0496 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import java.util.HashMap; import java.util.Map; @@ -141,7 +141,7 @@ copyFiles, new SnapshotHintChannelComputer(), parallelism) new SnapshotHintOperator(targetCatalogConfig)) .setParallelism(parallelism); - snapshotHintOperator.addSink(new DiscardingSink<>()).name("end").setParallelism(1); + snapshotHintOperator.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java index 524f2e5f01c1..00d527506cfe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java @@ -23,10 +23,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.flink.api.common.functions.OpenContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import java.net.InetSocketAddress; import java.util.TreeMap; @@ -34,57 +33,68 @@ import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP; /** Operator for address server to register addresses to {@link ServiceManager}. */ -public class QueryAddressRegister extends RichSinkFunction { - +public class QueryAddressRegister implements Sink { private final ServiceManager serviceManager; - private transient int numberExecutors; - private transient TreeMap executors; - public QueryAddressRegister(Table table) { this.serviceManager = ((FileStoreTable) table).store().newServiceManager(); } /** - * Do not annotate with @override here to maintain compatibility with Flink 1.18-. + * Do not annotate with @override here to maintain compatibility with Flink 2.0+. */ - public void open(OpenContext openContext) throws Exception { - open(new Configuration()); + public SinkWriter createWriter(InitContext context) { + return new QueryAddressRegisterSinkWriter(serviceManager); } /** - * Do not annotate with @override here to maintain compatibility with Flink 2.0+. + * Do not annotate with @override here to maintain compatibility with Flink 1.18-. */ - public void open(Configuration parameters) throws Exception { - this.executors = new TreeMap<>(); + public SinkWriter createWriter(WriterInitContext context) { + return new QueryAddressRegisterSinkWriter(serviceManager); } - @Override - public void invoke(InternalRow row, SinkFunction.Context context) { - int numberExecutors = row.getInt(0); - if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) { - throw new IllegalArgumentException( - String.format( - "Number Executors can not be changed! Old %s , New %s .", - this.numberExecutors, numberExecutors)); - } - this.numberExecutors = numberExecutors; + private static class QueryAddressRegisterSinkWriter implements SinkWriter { + private final ServiceManager serviceManager; - int executorId = row.getInt(1); - String hostname = row.getString(2).toString(); - int port = row.getInt(3); + private final TreeMap executors; - executors.put(executorId, new InetSocketAddress(hostname, port)); + private int numberExecutors; - if (executors.size() == numberExecutors) { - serviceManager.resetService( - PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0])); + private QueryAddressRegisterSinkWriter(ServiceManager serviceManager) { + this.serviceManager = serviceManager; + this.executors = new TreeMap<>(); } - } - @Override - public void close() throws Exception { - super.close(); - serviceManager.deleteService(PRIMARY_KEY_LOOKUP); + @Override + public void write(InternalRow row, Context context) { + int numberExecutors = row.getInt(0); + if (this.numberExecutors != 0 && this.numberExecutors != numberExecutors) { + throw new IllegalArgumentException( + String.format( + "Number Executors can not be changed! Old %s , New %s .", + this.numberExecutors, numberExecutors)); + } + this.numberExecutors = numberExecutors; + + int executorId = row.getInt(1); + String hostname = row.getString(2).toString(); + int port = row.getInt(3); + + executors.put(executorId, new InetSocketAddress(hostname, port)); + + if (executors.size() == numberExecutors) { + serviceManager.resetService( + PRIMARY_KEY_LOOKUP, executors.values().toArray(new InetSocketAddress[0])); + } + } + + @Override + public void flush(boolean endOfInput) {} + + @Override + public void close() { + serviceManager.deleteService(PRIMARY_KEY_LOOKUP); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java index bd433fe0f00d..752d54cff5a0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java @@ -62,7 +62,7 @@ public static void build(StreamExecutionEnvironment env, Table table, int parall InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()), executorOperator) .setParallelism(parallelism) - .addSink(new QueryAddressRegister(table)) + .sinkTo(new QueryAddressRegister(table)) .setParallelism(1); sink.getTransformation().setMaxParallelism(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index c2b4cc0f87e6..25f76ce97683 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.data.RowData; @@ -171,7 +171,7 @@ protected DataStreamSink doCommit( if (!options.get(SINK_COMMITTER_OPERATOR_CHAINING)) { committed = committed.startNewChain(); } - return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); + return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); } // TODO:refactor FlinkSink to adopt this sink diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 8d6c3554c76f..002f5887b5f0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -43,7 +43,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -316,7 +316,7 @@ protected DataStreamSink doCommit(DataStream written, String com } configureGlobalCommitter( committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); - return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); + return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); } public static void configureGlobalCommitter( From ba10887c1a5b7c0e9b9b9d0ee2d67fe67a6df1a8 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 2 Dec 2024 11:57:56 +0800 Subject: [PATCH 2/2] Recognize name conflict between writer and v2 sink --- .../apache/paimon/flink/sink/WriterChainingStrategyTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterChainingStrategyTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterChainingStrategyTest.java index a4605b830918..24fb529b59ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterChainingStrategyTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterChainingStrategyTest.java @@ -173,7 +173,7 @@ private List verifyChaining( List vertices = new ArrayList<>(); env.getStreamGraph().getJobGraph().getVertices().forEach(vertices::add); - JobVertex vertex = findVertex(vertices, "Writer"); + JobVertex vertex = findVertex(vertices, "Writer(write-only)"); if (isWriterChainedWithUpstream) { assertThat(vertex.toString()).contains("Source");