list) {}
+
+ @Override
+ public void notifyNoMoreSplits() {}
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
index 79ee827fe6e4..7954aad2df0a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java
@@ -21,19 +21,19 @@
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
-import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,15 +42,16 @@
import java.util.List;
/**
- * Source Function for unaware-bucket Compaction.
+ * Source for unaware-bucket Compaction.
*
- * Note: The function is the source function of unaware-bucket compactor coordinator. It will
- * read the latest snapshot continuously by compactionCoordinator, and generate new compaction
- * tasks. The source function is used in unaware-bucket compaction job (both stand-alone and
- * write-combined). Besides, we don't need to save state in this function, it will invoke a full
- * scan when starting up, and scan continuously for the following snapshot.
+ *
Note: The function is the source of unaware-bucket compactor coordinator. It will read the
+ * latest snapshot continuously by compactionCoordinator, and generate new compaction tasks. The
+ * source is used in unaware-bucket compaction job (both stand-alone and write-combined). Besides,
+ * we don't need to save state in this source, it will invoke a full scan when starting up, and scan
+ * continuously for the following snapshot.
*/
-public class BucketUnawareCompactSource extends RichSourceFunction {
+public class BucketUnawareCompactSource
+ extends AbstractNonCoordinatedSource {
private static final Logger LOG = LoggerFactory.getLogger(BucketUnawareCompactSource.class);
private static final String COMPACTION_COORDINATOR_NAME = "Compaction Coordinator";
@@ -59,9 +60,6 @@ public class BucketUnawareCompactSource extends RichSourceFunction ctx;
- private volatile boolean isRunning = true;
public BucketUnawareCompactSource(
FileStoreTable table,
@@ -74,76 +72,64 @@ public BucketUnawareCompactSource(
this.filter = filter;
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) throws Exception {
- open(new Configuration());
+ @Override
+ public Boundedness getBoundedness() {
+ return streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) throws Exception {
- compactionCoordinator =
- new UnawareAppendTableCompactionCoordinator(table, streaming, filter);
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext readerContext) throws Exception {
Preconditions.checkArgument(
- RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()) == 1,
+ readerContext.currentParallelism() == 1,
"Compaction Operator parallelism in paimon MUST be one.");
+ return new BucketUnawareCompactSourceReader(table, streaming, filter, scanInterval);
}
- @Override
- public void run(SourceContext sourceContext) throws Exception {
- this.ctx = sourceContext;
- while (isRunning) {
+ /** BucketUnawareCompactSourceReader. */
+ public static class BucketUnawareCompactSourceReader
+ extends AbstractNonCoordinatedSourceReader {
+ private final UnawareAppendTableCompactionCoordinator compactionCoordinator;
+ private final long scanInterval;
+
+ public BucketUnawareCompactSourceReader(
+ FileStoreTable table, boolean streaming, Predicate filter, long scanInterval) {
+ this.scanInterval = scanInterval;
+ compactionCoordinator =
+ new UnawareAppendTableCompactionCoordinator(table, streaming, filter);
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput readerOutput)
+ throws Exception {
boolean isEmpty;
- synchronized (ctx.getCheckpointLock()) {
- if (!isRunning) {
- return;
- }
- try {
- // do scan and plan action, emit append-only compaction tasks.
- List tasks = compactionCoordinator.run();
- isEmpty = tasks.isEmpty();
- tasks.forEach(ctx::collect);
- } catch (EndOfScanException esf) {
- LOG.info("Catching EndOfStreamException, the stream is finished.");
- return;
- }
+ try {
+ // do scan and plan action, emit append-only compaction tasks.
+ List tasks = compactionCoordinator.run();
+ isEmpty = tasks.isEmpty();
+ tasks.forEach(readerOutput::collect);
+ } catch (EndOfScanException esf) {
+ LOG.info("Catching EndOfStreamException, the stream is finished.");
+ return InputStatus.END_OF_INPUT;
}
if (isEmpty) {
Thread.sleep(scanInterval);
}
- }
- }
-
- @Override
- public void cancel() {
- if (ctx != null) {
- synchronized (ctx.getCheckpointLock()) {
- isRunning = false;
- }
- } else {
- isRunning = false;
+ return InputStatus.MORE_AVAILABLE;
}
}
public static DataStreamSource buildSource(
StreamExecutionEnvironment env,
BucketUnawareCompactSource source,
- boolean streaming,
String tableIdentifier) {
- final StreamSource sourceOperator =
- new StreamSource<>(source);
return (DataStreamSource)
- new DataStreamSource<>(
- env,
- new CompactionTaskTypeInfo(),
- sourceOperator,
- false,
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier,
- streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED)
+ new CompactionTaskTypeInfo())
.setParallelism(1)
.setMaxParallelism(1);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
index e5cbbe845ceb..415eddb037df 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java
@@ -21,10 +21,10 @@
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.LogicalTypeConversion;
-import org.apache.paimon.flink.source.operator.CombinedAwareBatchSourceFunction;
-import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSourceFunction;
-import org.apache.paimon.flink.source.operator.CombinedUnawareBatchSourceFunction;
-import org.apache.paimon.flink.source.operator.CombinedUnawareStreamingSourceFunction;
+import org.apache.paimon.flink.source.operator.CombinedAwareBatchSource;
+import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSource;
+import org.apache.paimon.flink.source.operator.CombinedUnawareBatchSource;
+import org.apache.paimon.flink.source.operator.CombinedUnawareStreamingSource;
import org.apache.paimon.table.system.CompactBucketsTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
@@ -87,7 +87,7 @@ public DataStream buildAwareBucketTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
RowType produceType = CompactBucketsTable.getRowType();
if (isContinuous) {
- return CombinedAwareStreamingSourceFunction.buildSource(
+ return CombinedAwareStreamingSource.buildSource(
env,
"Combine-MultiBucketTables--StreamingCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
@@ -97,7 +97,7 @@ public DataStream buildAwareBucketTableSource() {
databasePattern,
monitorInterval);
} else {
- return CombinedAwareBatchSourceFunction.buildSource(
+ return CombinedAwareBatchSource.buildSource(
env,
"Combine-MultiBucketTables-BatchCompactorSource",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)),
@@ -112,7 +112,7 @@ public DataStream buildAwareBucketTableSource() {
public DataStream buildForUnawareBucketsTableSource() {
Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null.");
if (isContinuous) {
- return CombinedUnawareStreamingSourceFunction.buildSource(
+ return CombinedUnawareStreamingSource.buildSource(
env,
"Combined-UnawareBucketTables-StreamingCompactorSource",
catalogLoader,
@@ -121,7 +121,7 @@ public DataStream buildForUnawareBucketsT
databasePattern,
monitorInterval);
} else {
- return CombinedUnawareBatchSourceFunction.buildSource(
+ return CombinedUnawareBatchSource.buildSource(
env,
"Combined-UnawareBucketTables-BatchCompactorSource",
catalogLoader,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index e864ec050045..b85d5274b241 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -26,7 +26,7 @@
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
-import org.apache.paimon.flink.source.operator.MonitorFunction;
+import org.apache.paimon.flink.source.operator.MonitorSource;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -306,7 +306,7 @@ private DataStream buildContinuousStreamOperator() {
"Cannot limit streaming source, please use batch execution mode.");
}
dataStream =
- MonitorFunction.buildSource(
+ MonitorSource.buildSource(
env,
sourceName,
produceTypeInfo(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumState.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumState.java
new file mode 100644
index 000000000000..f07317c155aa
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+/** The enumerator state class for {@link NoOpEnumerator}. */
+public class NoOpEnumState {}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumStateSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumStateSerializer.java
new file mode 100644
index 000000000000..89c0ad6ac1f1
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumStateSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/** {@link SimpleVersionedSerializer} for {@link NoOpEnumState}. */
+public class NoOpEnumStateSerializer implements SimpleVersionedSerializer {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(NoOpEnumState obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
+ return new NoOpEnumState();
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumerator.java
new file mode 100644
index 000000000000..f29c6d6db76d
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NoOpEnumerator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link SplitEnumerator} that provides no functionality. It is basically used for sources that
+ * does not require a coordinator.
+ */
+public class NoOpEnumerator
+ implements SplitEnumerator {
+ @Override
+ public void start() {}
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {}
+
+ @Override
+ public void addSplitsBack(List splits, int subtaskId) {}
+
+ @Override
+ public void addReader(int subtaskId) {}
+
+ @Override
+ public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+ return new NoOpEnumState();
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplit.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplit.java
new file mode 100644
index 000000000000..2db0868f8e34
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplit.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.UUID;
+
+/** A {@link SourceSplit} that provides basic information through splitId. */
+public class SimpleSourceSplit implements SourceSplit {
+ private final String splitId;
+ private final String value;
+
+ public SimpleSourceSplit() {
+ this("");
+ }
+
+ public SimpleSourceSplit(String value) {
+ this(UUID.randomUUID().toString(), value);
+ }
+
+ public SimpleSourceSplit(String splitId, String value) {
+ this.splitId = splitId;
+ this.value = value;
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+ public String value() {
+ return value;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplitSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplitSerializer.java
new file mode 100644
index 000000000000..3387afed1c2a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SimpleSourceSplitSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** {@link SimpleVersionedSerializer} for {@link SimpleSourceSplit}. */
+public class SimpleSourceSplitSerializer implements SimpleVersionedSerializer {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(SimpleSourceSplit split) throws IOException {
+ if (split.splitId() == null) {
+ return new byte[0];
+ }
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+ writeString(out, split.splitId());
+ writeString(out, split.value());
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public SimpleSourceSplit deserialize(int version, byte[] serialized) throws IOException {
+ if (serialized.length == 0) {
+ return new SimpleSourceSplit();
+ }
+
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ final DataInputStream in = new DataInputStream(bais)) {
+ String splitId = readString(in);
+ String value = readString(in);
+ return new SimpleSourceSplit(splitId, value);
+ }
+ }
+
+ private void writeString(DataOutputStream out, String str) throws IOException {
+ byte[] bytes = str.getBytes();
+ out.writeInt(bytes.length);
+ out.write(str.getBytes());
+ }
+
+ private String readString(DataInputStream in) throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ return new String(bytes);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SplitListState.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SplitListState.java
new file mode 100644
index 000000000000..0049bdf284e3
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SplitListState.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.state.ListState;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class to provide {@link ListState}-like experience for sources that use {@link
+ * SimpleSourceSplit}.
+ */
+public class SplitListState implements ListState {
+ private final String splitPrefix;
+ private final List values;
+ private final Function serializer;
+ private final Function deserializer;
+
+ public SplitListState(
+ String identifier, Function serializer, Function deserializer) {
+ Preconditions.checkArgument(
+ !Character.isDigit(identifier.charAt(0)),
+ String.format("Identifier %s should not start with digits.", identifier));
+ this.splitPrefix = identifier.length() + identifier;
+ this.serializer = serializer;
+ this.deserializer = deserializer;
+ this.values = new ArrayList<>();
+ }
+
+ @Override
+ public void add(T value) {
+ values.add(value);
+ }
+
+ @Override
+ public List get() {
+ return new ArrayList<>(values);
+ }
+
+ @Override
+ public void update(List values) {
+ this.values.clear();
+ this.values.addAll(values);
+ }
+
+ @Override
+ public void addAll(List values) throws Exception {
+ this.values.addAll(values);
+ }
+
+ @Override
+ public void clear() {
+ values.clear();
+ }
+
+ public List snapshotState() {
+ return values.stream()
+ .map(x -> new SimpleSourceSplit(splitPrefix + serializer.apply(x)))
+ .collect(Collectors.toList());
+ }
+
+ public void restoreState(List splits) {
+ values.clear();
+ splits.stream()
+ .map(SimpleSourceSplit::value)
+ .filter(x -> x.startsWith(splitPrefix))
+ .map(x -> x.substring(splitPrefix.length()))
+ .map(this.deserializer)
+ .forEach(values::add);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
similarity index 66%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
index 2157be51aee4..c3a1258bb176 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSource.java
@@ -21,21 +21,23 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
import org.apache.paimon.flink.compact.MultiTableScanBase;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
-import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,15 +49,11 @@
import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY;
/** It is responsible for monitoring compactor source of aware bucket table in batch mode. */
-public class CombinedAwareBatchSourceFunction
- extends CombinedCompactorSourceFunction> {
+public class CombinedAwareBatchSource extends CombinedCompactorSource> {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(CombinedAwareBatchSourceFunction.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(CombinedAwareBatchSource.class);
- private MultiTableScanBase> tableScan;
-
- public CombinedAwareBatchSourceFunction(
+ public CombinedAwareBatchSource(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
@@ -63,34 +61,33 @@ public CombinedAwareBatchSourceFunction(
super(catalogLoader, includingPattern, excludingPattern, databasePattern, false);
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) throws Exception {
- open(new Configuration());
+ @Override
+ public SourceReader, SimpleSourceSplit> createReader(
+ SourceReaderContext sourceReaderContext) throws Exception {
+ return new Reader();
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- tableScan =
- new MultiAwareBucketTableScan(
- catalogLoader,
- includingPattern,
- excludingPattern,
- databasePattern,
- isStreaming,
- isRunning);
- }
+ private class Reader extends AbstractNonCoordinatedSourceReader> {
+ private MultiTableScanBase> tableScan;
- @Override
- void scanTable() throws Exception {
- if (isRunning.get()) {
- MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx);
+ @Override
+ public void start() {
+ super.start();
+ tableScan =
+ new MultiAwareBucketTableScan(
+ catalogLoader,
+ includingPattern,
+ excludingPattern,
+ databasePattern,
+ isStreaming);
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput> readerOutput)
+ throws Exception {
+ MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput);
if (scanResult == FINISHED) {
- return;
+ return InputStatus.END_OF_INPUT;
}
if (scanResult == IS_EMPTY) {
// Currently, in the combined mode, there are two scan tasks for the table of two
@@ -99,6 +96,15 @@ void scanTable() throws Exception {
// should not be thrown exception here.
LOGGER.info("No file were collected for the table of aware-bucket");
}
+ return InputStatus.END_OF_INPUT;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
}
}
@@ -111,15 +117,14 @@ public static DataStream buildSource(
Pattern excludingPattern,
Pattern databasePattern,
Duration partitionIdleTime) {
- CombinedAwareBatchSourceFunction function =
- new CombinedAwareBatchSourceFunction(
+ CombinedAwareBatchSource source =
+ new CombinedAwareBatchSource(
catalogLoader, includingPattern, excludingPattern, databasePattern);
- StreamSource, ?> sourceOperator = new StreamSource<>(function);
TupleTypeInfo> tupleTypeInfo =
new TupleTypeInfo<>(
new JavaTypeInfo<>(Split.class), BasicTypeInfo.STRING_TYPE_INFO);
- return new DataStreamSource<>(
- env, tupleTypeInfo, sourceOperator, false, name, Boundedness.BOUNDED)
+
+ return env.fromSource(source, WatermarkStrategy.noWatermarks(), name, tupleTypeInfo)
.forceNonParallel()
.partitionCustom(
(key, numPartitions) -> key % numPartitions,
@@ -129,12 +134,4 @@ public static DataStream buildSource(
typeInfo,
new MultiTablesReadOperator(catalogLoader, false, partitionIdleTime));
}
-
- @Override
- public void close() throws Exception {
- super.close();
- if (tableScan != null) {
- tableScan.close();
- }
- }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
similarity index 63%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
index 01e0127e9fda..9bd4a84f571c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSource.java
@@ -21,21 +21,23 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.compact.MultiAwareBucketTableScan;
import org.apache.paimon.flink.compact.MultiTableScanBase;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
-import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.table.data.RowData;
import java.util.regex.Pattern;
@@ -44,13 +46,11 @@
import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY;
/** It is responsible for monitoring compactor source of multi bucket table in stream mode. */
-public class CombinedAwareStreamingSourceFunction
- extends CombinedCompactorSourceFunction> {
+public class CombinedAwareStreamingSource extends CombinedCompactorSource> {
private final long monitorInterval;
- private transient MultiTableScanBase> tableScan;
- public CombinedAwareStreamingSourceFunction(
+ public CombinedAwareStreamingSource(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
@@ -60,39 +60,46 @@ public CombinedAwareStreamingSourceFunction(
this.monitorInterval = monitorInterval;
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) throws Exception {
- open(new Configuration());
+ @Override
+ public SourceReader, SimpleSourceSplit> createReader(
+ SourceReaderContext sourceReaderContext) throws Exception {
+ return new Reader();
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- tableScan =
- new MultiAwareBucketTableScan(
- catalogLoader,
- includingPattern,
- excludingPattern,
- databasePattern,
- isStreaming,
- isRunning);
- }
+ private class Reader extends AbstractNonCoordinatedSourceReader> {
+ private transient MultiTableScanBase> tableScan;
- @SuppressWarnings("BusyWait")
- @Override
- void scanTable() throws Exception {
- while (isRunning.get()) {
- MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx);
+ @Override
+ public void start() {
+ super.start();
+ tableScan =
+ new MultiAwareBucketTableScan(
+ catalogLoader,
+ includingPattern,
+ excludingPattern,
+ databasePattern,
+ isStreaming);
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput> readerOutput)
+ throws Exception {
+ MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput);
if (scanResult == FINISHED) {
- return;
+ return InputStatus.END_OF_INPUT;
}
if (scanResult == IS_EMPTY) {
Thread.sleep(monitorInterval);
}
+ return InputStatus.MORE_AVAILABLE;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
}
}
@@ -106,37 +113,22 @@ public static DataStream buildSource(
Pattern databasePattern,
long monitorInterval) {
- CombinedAwareStreamingSourceFunction function =
- new CombinedAwareStreamingSourceFunction(
+ CombinedAwareStreamingSource source =
+ new CombinedAwareStreamingSource(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
- StreamSource, ?> sourceOperator = new StreamSource<>(function);
- boolean isParallel = false;
TupleTypeInfo> tupleTypeInfo =
new TupleTypeInfo<>(
new JavaTypeInfo<>(Split.class), BasicTypeInfo.STRING_TYPE_INFO);
- return new DataStreamSource<>(
- env,
- tupleTypeInfo,
- sourceOperator,
- isParallel,
- name,
- Boundedness.CONTINUOUS_UNBOUNDED)
+
+ return env.fromSource(source, WatermarkStrategy.noWatermarks(), name, tupleTypeInfo)
.forceNonParallel()
.partitionCustom(
(key, numPartitions) -> key % numPartitions,
split -> ((DataSplit) split.f0).bucket())
.transform(name, typeInfo, new MultiTablesReadOperator(catalogLoader, true));
}
-
- @Override
- public void close() throws Exception {
- super.close();
- if (tableScan != null) {
- tableScan.close();
- }
- }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
similarity index 63%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
index 02bb8786505d..f58d86cdd65e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSource.java
@@ -20,13 +20,11 @@
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.table.source.Split;
-import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.api.connector.source.Boundedness;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/**
@@ -45,8 +43,7 @@
* Currently, only dedicated compaction job for multi-tables rely on this monitor. This is the
* single (non-parallel) monitoring task, it is responsible for the new Paimon table.
*/
-public abstract class CombinedCompactorSourceFunction extends RichSourceFunction {
-
+public abstract class CombinedCompactorSource extends AbstractNonCoordinatedSource {
private static final long serialVersionUID = 2L;
protected final Catalog.Loader catalogLoader;
@@ -55,10 +52,7 @@ public abstract class CombinedCompactorSourceFunction extends RichSourceFunct
protected final Pattern databasePattern;
protected final boolean isStreaming;
- protected transient AtomicBoolean isRunning;
- protected transient SourceContext ctx;
-
- public CombinedCompactorSourceFunction(
+ public CombinedCompactorSource(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
@@ -71,37 +65,8 @@ public CombinedCompactorSourceFunction(
this.isStreaming = isStreaming;
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) throws Exception {
- open(new Configuration());
- }
-
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) throws Exception {
- isRunning = new AtomicBoolean(true);
- }
-
@Override
- public void run(SourceContext sourceContext) throws Exception {
- this.ctx = sourceContext;
- scanTable();
+ public Boundedness getBoundedness() {
+ return isStreaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
}
-
- @Override
- public void cancel() {
- // this is to cover the case where cancel() is called before the run()
- if (ctx != null) {
- synchronized (ctx.getCheckpointLock()) {
- isRunning.set(false);
- }
- } else {
- isRunning.set(false);
- }
- }
-
- abstract void scanTable() throws Exception;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
similarity index 71%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
index 6a40f10ada61..64f0c38f5a11 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java
@@ -25,18 +25,20 @@
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.StreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,14 +57,12 @@
* It is responsible for the batch compactor source of the table with unaware bucket in combined
* mode.
*/
-public class CombinedUnawareBatchSourceFunction
- extends CombinedCompactorSourceFunction {
+public class CombinedUnawareBatchSource
+ extends CombinedCompactorSource {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(CombinedUnawareBatchSourceFunction.class);
- private transient MultiTableScanBase tableScan;
+ private static final Logger LOGGER = LoggerFactory.getLogger(CombinedUnawareBatchSource.class);
- public CombinedUnawareBatchSourceFunction(
+ public CombinedUnawareBatchSource(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
@@ -70,34 +70,34 @@ public CombinedUnawareBatchSourceFunction(
super(catalogLoader, includingPattern, excludingPattern, databasePattern, false);
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) throws Exception {
- open(new Configuration());
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext sourceReaderContext) throws Exception {
+ return new Reader();
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- tableScan =
- new MultiUnawareBucketTableScan(
- catalogLoader,
- includingPattern,
- excludingPattern,
- databasePattern,
- isStreaming,
- isRunning);
- }
+ private class Reader
+ extends AbstractNonCoordinatedSourceReader {
+ private transient MultiTableScanBase tableScan;
+
+ @Override
+ public void start() {
+ super.start();
+ tableScan =
+ new MultiUnawareBucketTableScan(
+ catalogLoader,
+ includingPattern,
+ excludingPattern,
+ databasePattern,
+ isStreaming);
+ }
- @Override
- void scanTable() throws Exception {
- if (isRunning.get()) {
- MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx);
+ @Override
+ public InputStatus pollNext(
+ ReaderOutput readerOutput) throws Exception {
+ MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput);
if (scanResult == FINISHED) {
- return;
+ return InputStatus.END_OF_INPUT;
}
if (scanResult == IS_EMPTY) {
// Currently, in the combined mode, there are two scan tasks for the table of two
@@ -106,6 +106,15 @@ void scanTable() throws Exception {
// should not be thrown exception here.
LOGGER.info("No file were collected for the table of unaware-bucket");
}
+ return InputStatus.END_OF_INPUT;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
}
}
@@ -117,22 +126,18 @@ public static DataStream buildSource(
Pattern excludingPattern,
Pattern databasePattern,
@Nullable Duration partitionIdleTime) {
- CombinedUnawareBatchSourceFunction function =
- new CombinedUnawareBatchSourceFunction(
+ CombinedUnawareBatchSource combinedUnawareBatchSource =
+ new CombinedUnawareBatchSource(
catalogLoader, includingPattern, excludingPattern, databasePattern);
- StreamSource
- sourceOperator = new StreamSource<>(function);
MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo =
new MultiTableCompactionTaskTypeInfo();
SingleOutputStreamOperator source =
- new DataStreamSource<>(
- env,
- compactionTaskTypeInfo,
- sourceOperator,
- false,
+ env.fromSource(
+ combinedUnawareBatchSource,
+ WatermarkStrategy.noWatermarks(),
name,
- Boundedness.BOUNDED)
+ compactionTaskTypeInfo)
.forceNonParallel();
if (partitionIdleTime != null) {
@@ -177,12 +182,4 @@ private static Long getPartitionInfo(
}
return partitionInfo.get(partition);
}
-
- @Override
- public void close() throws Exception {
- super.close();
- if (tableScan != null) {
- tableScan.close();
- }
- }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
similarity index 57%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
index b64518a7ef60..6ea1ead4db30 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSource.java
@@ -23,14 +23,16 @@
import org.apache.paimon.flink.compact.MultiTableScanBase;
import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan;
import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
-import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.StreamSource;
import java.util.regex.Pattern;
@@ -40,13 +42,12 @@
/**
* It is responsible for monitoring compactor source in stream mode for the table of unaware bucket.
*/
-public class CombinedUnawareStreamingSourceFunction
- extends CombinedCompactorSourceFunction {
+public class CombinedUnawareStreamingSource
+ extends CombinedCompactorSource {
private final long monitorInterval;
- private MultiTableScanBase tableScan;
- public CombinedUnawareStreamingSourceFunction(
+ public CombinedUnawareStreamingSource(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
@@ -56,39 +57,47 @@ public CombinedUnawareStreamingSourceFunction(
this.monitorInterval = monitorInterval;
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 1.18-.
- */
- public void open(OpenContext openContext) throws Exception {
- open(new Configuration());
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext sourceReaderContext) throws Exception {
+ return new Reader();
}
- /**
- * Do not annotate with @override here to maintain compatibility with Flink 2.0+.
- */
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- tableScan =
- new MultiUnawareBucketTableScan(
- catalogLoader,
- includingPattern,
- excludingPattern,
- databasePattern,
- isStreaming,
- isRunning);
- }
+ private class Reader
+ extends AbstractNonCoordinatedSourceReader {
+ private MultiTableScanBase tableScan;
- @SuppressWarnings("BusyWait")
- @Override
- void scanTable() throws Exception {
- while (isRunning.get()) {
- MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx);
+ @Override
+ public void start() {
+ super.start();
+ tableScan =
+ new MultiUnawareBucketTableScan(
+ catalogLoader,
+ includingPattern,
+ excludingPattern,
+ databasePattern,
+ isStreaming);
+ }
+
+ @Override
+ public InputStatus pollNext(
+ ReaderOutput readerOutput) throws Exception {
+ MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(readerOutput);
if (scanResult == FINISHED) {
- return;
+ return InputStatus.END_OF_INPUT;
}
if (scanResult == IS_EMPTY) {
Thread.sleep(monitorInterval);
}
+ return InputStatus.MORE_AVAILABLE;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableScan != null) {
+ tableScan.close();
+ }
}
}
@@ -101,33 +110,18 @@ public static DataStream buildSource(
Pattern databasePattern,
long monitorInterval) {
- CombinedUnawareStreamingSourceFunction function =
- new CombinedUnawareStreamingSourceFunction(
+ CombinedUnawareStreamingSource source =
+ new CombinedUnawareStreamingSource(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
monitorInterval);
- StreamSource
- sourceOperator = new StreamSource<>(function);
- boolean isParallel = false;
MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo =
new MultiTableCompactionTaskTypeInfo();
- return new DataStreamSource<>(
- env,
- compactionTaskTypeInfo,
- sourceOperator,
- isParallel,
- name,
- Boundedness.CONTINUOUS_UNBOUNDED)
- .forceNonParallel();
- }
- @Override
- public void close() throws Exception {
- super.close();
- if (tableScan != null) {
- tableScan.close();
- }
+ return env.fromSource(
+ source, WatermarkStrategy.noWatermarks(), name, compactionTaskTypeInfo)
+ .forceNonParallel();
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
similarity index 53%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index 3805f6f8c536..4ec0a4f99d9f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -18,6 +18,10 @@
package org.apache.paimon.flink.source.operator;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
+import org.apache.paimon.flink.source.SplitListState;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
@@ -27,22 +31,18 @@
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -71,33 +71,23 @@
* Currently, there are two features that rely on this monitor:
*
*
- * - Consumer-id: rely on this function to do aligned snapshot consumption, and ensure that all
+ *
- Consumer-id: rely on this source to do aligned snapshot consumption, and ensure that all
* data in a snapshot is consumed within each checkpoint.
*
- Snapshot-watermark: when there is no watermark definition, the default Paimon table will
* pass the watermark recorded in the snapshot.
*
*/
-public class MonitorFunction extends RichSourceFunction
- implements CheckpointedFunction, CheckpointListener {
+public class MonitorSource extends AbstractNonCoordinatedSource {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(MonitorFunction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class);
private final ReadBuilder readBuilder;
private final long monitorInterval;
private final boolean emitSnapshotWatermark;
- private volatile boolean isRunning = true;
-
- private transient StreamTableScan scan;
- private transient SourceContext ctx;
-
- private transient ListState checkpointState;
- private transient ListState> nextSnapshotState;
- private transient TreeMap nextSnapshotPerCheckpoint;
-
- public MonitorFunction(
+ public MonitorSource(
ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) {
this.readBuilder = readBuilder;
this.monitorInterval = monitorInterval;
@@ -105,40 +95,74 @@ public MonitorFunction(
}
@Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- this.scan = readBuilder.newStreamScan();
-
- this.checkpointState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- "next-snapshot", LongSerializer.INSTANCE));
-
- @SuppressWarnings("unchecked")
- final Class> typedTuple =
- (Class>) (Class>) Tuple2.class;
- this.nextSnapshotState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- "next-snapshot-per-checkpoint",
- new TupleSerializer<>(
- typedTuple,
- new TypeSerializer[] {
- LongSerializer.INSTANCE, LongSerializer.INSTANCE
- })));
-
- this.nextSnapshotPerCheckpoint = new TreeMap<>();
-
- if (context.isRestored()) {
- LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext sourceReaderContext) throws Exception {
+ return new Reader();
+ }
+
+ private class Reader extends AbstractNonCoordinatedSourceReader {
+ private static final String CHECKPOINT_STATE = "CS";
+ private static final String NEXT_SNAPSHOT_STATE = "NSS";
+
+ private final StreamTableScan scan = readBuilder.newStreamScan();
+ private final SplitListState checkpointState =
+ new SplitListState<>(CHECKPOINT_STATE, x -> Long.toString(x), Long::parseLong);
+ private final SplitListState> nextSnapshotState =
+ new SplitListState<>(
+ NEXT_SNAPSHOT_STATE,
+ x -> x.f0 + ":" + x.f1,
+ x ->
+ Tuple2.of(
+ Long.parseLong(x.split(":")[0]),
+ Long.parseLong(x.split(":")[1])));
+ private final TreeMap nextSnapshotPerCheckpoint = new TreeMap<>();
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ NavigableMap nextSnapshots =
+ nextSnapshotPerCheckpoint.headMap(checkpointId, true);
+ OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
+ max.ifPresent(scan::notifyCheckpointComplete);
+ nextSnapshots.clear();
+ }
- List retrievedStates = new ArrayList<>();
- for (Long entry : this.checkpointState.get()) {
- retrievedStates.add(entry);
+ @Override
+ public List snapshotState(long checkpointId) {
+ this.checkpointState.clear();
+ Long nextSnapshot = this.scan.checkpoint();
+ if (nextSnapshot != null) {
+ this.checkpointState.add(nextSnapshot);
+ this.nextSnapshotPerCheckpoint.put(checkpointId, nextSnapshot);
}
- // given that the parallelism of the function is 1, we can only have 1 retrieved items.
+ List> nextSnapshots = new ArrayList<>();
+ this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2<>(k, v)));
+ this.nextSnapshotState.update(nextSnapshots);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot);
+ }
+
+ List results = new ArrayList<>();
+ results.addAll(checkpointState.snapshotState());
+ results.addAll(nextSnapshotState.snapshotState());
+ return results;
+ }
+
+ @Override
+ public void addSplits(List list) {
+ LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+ checkpointState.restoreState(list);
+ nextSnapshotState.restoreState(list);
+
+ List retrievedStates = checkpointState.get();
+
+ // given that the parallelism of the source is 1, we can only have 1 retrieved items.
Preconditions.checkArgument(
retrievedStates.size() <= 1,
getClass().getSimpleName() + " retrieved invalid state.");
@@ -150,80 +174,31 @@ public void initializeState(FunctionInitializationContext context) throws Except
for (Tuple2 tuple2 : nextSnapshotState.get()) {
nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1);
}
- } else {
- LOG.info("No state to restore for the {}.", getClass().getSimpleName());
}
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
- this.checkpointState.clear();
- Long nextSnapshot = this.scan.checkpoint();
- if (nextSnapshot != null) {
- this.checkpointState.add(nextSnapshot);
- this.nextSnapshotPerCheckpoint.put(ctx.getCheckpointId(), nextSnapshot);
- }
-
- List> nextSnapshots = new ArrayList<>();
- this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2<>(k, v)));
- this.nextSnapshotState.update(nextSnapshots);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot);
- }
- }
-
- @SuppressWarnings("BusyWait")
- @Override
- public void run(SourceContext ctx) throws Exception {
- this.ctx = ctx;
- while (isRunning) {
+ @Override
+ public InputStatus pollNext(ReaderOutput readerOutput) throws Exception {
boolean isEmpty;
- synchronized (ctx.getCheckpointLock()) {
- if (!isRunning) {
- return;
- }
- try {
- List splits = scan.plan().splits();
- isEmpty = splits.isEmpty();
- splits.forEach(ctx::collect);
-
- if (emitSnapshotWatermark) {
- Long watermark = scan.watermark();
- if (watermark != null) {
- ctx.emitWatermark(new Watermark(watermark));
- }
+ try {
+ List splits = scan.plan().splits();
+ isEmpty = splits.isEmpty();
+ splits.forEach(readerOutput::collect);
+
+ if (emitSnapshotWatermark) {
+ Long watermark = scan.watermark();
+ if (watermark != null) {
+ readerOutput.emitWatermark(new Watermark(watermark));
}
- } catch (EndOfScanException esf) {
- LOG.info("Catching EndOfStreamException, the stream is finished.");
- return;
}
+ } catch (EndOfScanException esf) {
+ LOG.info("Catching EndOfStreamException, the stream is finished.");
+ return InputStatus.END_OF_INPUT;
}
if (isEmpty) {
Thread.sleep(monitorInterval);
}
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- NavigableMap nextSnapshots =
- nextSnapshotPerCheckpoint.headMap(checkpointId, true);
- OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
- max.ifPresent(scan::notifyCheckpointComplete);
- nextSnapshots.clear();
- }
-
- @Override
- public void cancel() {
- // this is to cover the case where cancel() is called before the run()
- if (ctx != null) {
- synchronized (ctx.getCheckpointLock()) {
- isRunning = false;
- }
- } else {
- isRunning = false;
+ return InputStatus.MORE_AVAILABLE;
}
}
@@ -237,9 +212,10 @@ public static DataStream buildSource(
boolean shuffleBucketWithPartition,
BucketMode bucketMode) {
SingleOutputStreamOperator singleOutputStreamOperator =
- env.addSource(
- new MonitorFunction(
+ env.fromSource(
+ new MonitorSource(
readBuilder, monitorInterval, emitSnapshotWatermark),
+ WatermarkStrategy.noWatermarks(),
name + "-Monitor",
new JavaTypeInfo<>(Split.class))
.forceNonParallel();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
index 73d46ae1e3f1..fbc8bb9d756a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java
@@ -52,9 +52,8 @@
/**
* The operator that reads the Tuple2<{@link Split}, String> received from the preceding {@link
- * CombinedAwareBatchSourceFunction} or {@link CombinedAwareStreamingSourceFunction}. Contrary to
- * the {@link CombinedCompactorSourceFunction} which has a parallelism of 1, this operator can have
- * DOP > 1.
+ * CombinedAwareBatchSource} or {@link CombinedAwareStreamingSource}. Contrary to the {@link
+ * CombinedCompactorSource} which has a parallelism of 1, this operator can have DOP > 1.
*/
public class MultiTablesReadOperator extends AbstractStreamOperator
implements OneInputStreamOperator, RowData> {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
index c501c2519b41..0864741a178f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java
@@ -44,7 +44,7 @@
/**
* The operator is used for historical partition compaction. It reads {@link
* MultiTableUnawareAppendCompactionTask} received from the preceding {@link
- * CombinedUnawareBatchSourceFunction} and filter partitions which is not historical.
+ * CombinedUnawareBatchSource} and filter partitions which is not historical.
*/
public class MultiUnawareTablesReadOperator
extends AbstractStreamOperator
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index 80c85f7cdb35..6caf4544e514 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -38,8 +38,8 @@
/**
* The operator that reads the {@link Split splits} received from the preceding {@link
- * MonitorFunction}. Contrary to the {@link MonitorFunction} which has a parallelism of 1, this
- * operator can have DOP > 1.
+ * MonitorSource}. Contrary to the {@link MonitorSource} which has a parallelism of 1, this operator
+ * can have DOP > 1.
*/
public class ReadOperator extends AbstractStreamOperator
implements OneInputStreamOperator {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 6a2c7b071d2d..5245114e80ee 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -36,6 +36,7 @@
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
@@ -450,7 +451,12 @@ private void sinkAndValidate(
throw new UnsupportedOperationException();
}
DataStreamSource source =
- env.addSource(new FiniteTestSource<>(src, true), InternalTypeInfo.of(TABLE_TYPE));
+ env.fromSource(
+ new FiniteTestSource<>(src, true),
+ WatermarkStrategy.noWatermarks(),
+ "FiniteTestSource",
+ InternalTypeInfo.of(TABLE_TYPE));
+ source.forceNonParallel();
new FlinkSinkBuilder(table).forRowData(source).build();
env.execute();
assertThat(iterator.collect(expected.length)).containsExactlyInAnyOrder(expected);
@@ -521,9 +527,13 @@ public static DataStreamSource buildTestSource(
StreamExecutionEnvironment env, boolean isBatch) {
return isBatch
? env.fromCollection(SOURCE_DATA, InternalTypeInfo.of(TABLE_TYPE))
- : env.addSource(
- new FiniteTestSource<>(SOURCE_DATA, false),
- InternalTypeInfo.of(TABLE_TYPE));
+ : (DataStreamSource)
+ env.fromSource(
+ new FiniteTestSource<>(SOURCE_DATA, false),
+ WatermarkStrategy.noWatermarks(),
+ "FiniteTestSource",
+ InternalTypeInfo.of(TABLE_TYPE))
+ .forceNonParallel();
}
public static List executeAndCollect(DataStream source) throws Exception {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FiniteTestSource.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FiniteTestSource.java
index 9c5254d6283b..6691b9c09514 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FiniteTestSource.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FiniteTestSource.java
@@ -18,16 +18,18 @@
package org.apache.paimon.flink;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
+import org.apache.paimon.flink.source.SplitListState;
import org.apache.paimon.utils.Preconditions;
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.ArrayList;
import java.util.List;
@@ -39,8 +41,7 @@
*
* The reason this class is rewritten is to support {@link CheckpointedFunction}.
*/
-public class FiniteTestSource
- implements SourceFunction, CheckpointedFunction, CheckpointListener {
+public class FiniteTestSource extends AbstractNonCoordinatedSource {
private static final long serialVersionUID = 1L;
@@ -48,27 +49,78 @@ public class FiniteTestSource
private final boolean emitOnce;
- private volatile boolean running = true;
-
- private transient int numCheckpointsComplete;
-
- private transient ListState checkpointedState;
-
- private volatile int numTimesEmitted;
-
public FiniteTestSource(List elements, boolean emitOnce) {
this.elements = elements;
this.emitOnce = emitOnce;
}
@Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- this.checkpointedState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>("emit-times", IntSerializer.INSTANCE));
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader createReader(SourceReaderContext sourceReaderContext)
+ throws Exception {
+ return new Reader<>(elements, emitOnce);
+ }
+
+ private static class Reader extends AbstractNonCoordinatedSourceReader {
+
+ private final List elements;
+
+ private final boolean emitOnce;
+
+ private final SplitListState checkpointedState =
+ new SplitListState<>("emit-times", x -> Integer.toString(x), Integer::parseInt);
+
+ private int numTimesEmitted = 0;
+
+ private int numCheckpointsComplete;
+
+ private Integer checkpointToAwait;
+
+ private Reader(List elements, boolean emitOnce) {
+ this.elements = elements;
+ this.emitOnce = emitOnce;
+ this.numCheckpointsComplete = 0;
+ }
+
+ @Override
+ public synchronized InputStatus pollNext(ReaderOutput readerOutput) {
+ if (checkpointToAwait == null) {
+ checkpointToAwait = numCheckpointsComplete + 2;
+ }
+ switch (numTimesEmitted) {
+ case 0:
+ emitElements(readerOutput, false);
+ if (numCheckpointsComplete < checkpointToAwait) {
+ return InputStatus.MORE_AVAILABLE;
+ }
+ emitElements(readerOutput, true);
+ if (numCheckpointsComplete < checkpointToAwait + 2) {
+ return InputStatus.MORE_AVAILABLE;
+ }
+ break;
+ case 1:
+ emitElements(readerOutput, true);
+ if (numCheckpointsComplete < checkpointToAwait) {
+ return InputStatus.MORE_AVAILABLE;
+ }
+ break;
+ case 2:
+ // Maybe missed notifyCheckpointComplete, wait next notifyCheckpointComplete
+ if (numCheckpointsComplete < checkpointToAwait) {
+ return InputStatus.MORE_AVAILABLE;
+ }
+ break;
+ }
+ return InputStatus.END_OF_INPUT;
+ }
- if (context.isRestored()) {
+ @Override
+ public void addSplits(List list) {
+ checkpointedState.restoreState(list);
List retrievedStates = new ArrayList<>();
for (Integer entry : this.checkpointedState.get()) {
retrievedStates.add(entry);
@@ -85,76 +137,27 @@ public void initializeState(FunctionInitializationContext context) throws Except
getClass().getSimpleName()
+ " retrieved invalid numTimesEmitted: "
+ numTimesEmitted);
- } else {
- this.numTimesEmitted = 0;
}
- }
- @Override
- public void run(SourceContext ctx) throws Exception {
- switch (numTimesEmitted) {
- case 0:
- emitElementsAndWaitForCheckpoints(ctx, false);
- emitElementsAndWaitForCheckpoints(ctx, true);
- break;
- case 1:
- emitElementsAndWaitForCheckpoints(ctx, true);
- break;
- case 2:
- // Maybe missed notifyCheckpointComplete, wait next notifyCheckpointComplete
- final Object lock = ctx.getCheckpointLock();
- synchronized (lock) {
- int checkpointToAwait = numCheckpointsComplete + 2;
- while (running && numCheckpointsComplete < checkpointToAwait) {
- lock.wait(1);
- }
- }
- break;
+ @Override
+ public List snapshotState(long l) {
+ this.checkpointedState.clear();
+ this.checkpointedState.add(this.numTimesEmitted);
+ return this.checkpointedState.snapshotState();
}
- }
- private void emitElementsAndWaitForCheckpoints(SourceContext ctx, boolean isSecond)
- throws InterruptedException {
- final Object lock = ctx.getCheckpointLock();
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numCheckpointsComplete++;
+ }
- final int checkpointToAwait;
- synchronized (lock) {
- checkpointToAwait = numCheckpointsComplete + 2;
+ private void emitElements(ReaderOutput readerOutput, boolean isSecond) {
if (!isSecond || !emitOnce) {
for (T t : elements) {
- ctx.collect(t);
+ readerOutput.collect(t);
}
}
numTimesEmitted++;
}
-
- synchronized (lock) {
- while (running && numCheckpointsComplete < checkpointToAwait) {
- lock.wait(1);
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- numCheckpointsComplete++;
- }
-
- @Override
- public void notifyCheckpointAborted(long checkpointId) {}
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- Preconditions.checkState(
- this.checkpointedState != null,
- "The " + getClass().getSimpleName() + " has not been properly initialized.");
-
- this.checkpointedState.clear();
- this.checkpointedState.add(this.numTimesEmitted);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
index 594affc124eb..75b96cbe02eb 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java
@@ -47,8 +47,10 @@ public SerializableRowData(RowData row, TypeSerializer serializer) {
this.serializer = serializer;
}
- private void writeObject(ObjectOutputStream out) throws IOException {
+ private synchronized void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
+ // This following invocation needs to be synchronized to avoid racing problems when the
+ // serializer is reused across multiple subtasks.
serializer.serialize(row, new DataOutputViewStreamWrapper(out));
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 6ca78b088fb7..fb8bee5d5962 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -20,7 +20,9 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.utils.RuntimeContextUtils;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.reader.RecordReader;
@@ -30,9 +32,14 @@
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TimeUtils;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
@@ -380,7 +387,12 @@ public void testStatelessWriter() throws Exception {
.checkpointIntervalMs(500)
.build();
DataStream source =
- env.addSource(new TestStatelessWriterSource(table)).setParallelism(2).forward();
+ env.fromSource(
+ new TestStatelessWriterSource(table),
+ WatermarkStrategy.noWatermarks(),
+ "TestStatelessWriterSource")
+ .setParallelism(2)
+ .forward();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.registerCatalog("mycat", sEnv.getCatalog("PAIMON").get());
@@ -392,46 +404,59 @@ public void testStatelessWriter() throws Exception {
.containsExactlyInAnyOrder(Row.of(1, "test"), Row.of(2, "test"));
}
- private static class TestStatelessWriterSource extends RichParallelSourceFunction {
+ private static class TestStatelessWriterSource extends AbstractNonCoordinatedSource {
private final FileStoreTable table;
- private volatile boolean isRunning = true;
-
private TestStatelessWriterSource(FileStoreTable table) {
this.table = table;
}
@Override
- public void run(SourceContext sourceContext) throws Exception {
- int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
- // wait some time in parallelism #2,
- // so that it does not commit in the same checkpoint with parallelism #1
- int waitCount = (taskId == 0 ? 0 : 10);
-
- while (isRunning) {
- synchronized (sourceContext.getCheckpointLock()) {
- if (taskId == 0) {
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext sourceReaderContext) throws Exception {
+ return new Reader(sourceReaderContext.getIndexOfSubtask());
+ }
+
+ private class Reader extends AbstractNonCoordinatedSourceReader {
+ private final int taskId;
+ private int waitCount;
+
+ private Reader(int taskId) {
+ this.taskId = taskId;
+ this.waitCount = (taskId == 0 ? 0 : 10);
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput readerOutput) throws Exception {
+ if (taskId == 0) {
+ if (waitCount == 0) {
+ readerOutput.collect(1);
+ } else if (countNumRecords() >= 1) {
+ // wait for the record to commit before exiting
+ Thread.sleep(1000);
+ return InputStatus.END_OF_INPUT;
+ }
+ } else {
+ int numRecords = countNumRecords();
+ if (numRecords >= 1) {
if (waitCount == 0) {
- sourceContext.collect(1);
- } else if (countNumRecords() >= 1) {
- // wait for the record to commit before exiting
- break;
- }
- } else {
- int numRecords = countNumRecords();
- if (numRecords >= 1) {
- if (waitCount == 0) {
- sourceContext.collect(2);
- } else if (countNumRecords() >= 2) {
- // make sure the next checkpoint is successful
- break;
- }
+ readerOutput.collect(2);
+ } else if (countNumRecords() >= 2) {
+ // make sure the next checkpoint is successful
+ Thread.sleep(1000);
+ return InputStatus.END_OF_INPUT;
}
}
- waitCount--;
}
+ waitCount--;
Thread.sleep(1000);
+ return InputStatus.MORE_AVAILABLE;
}
}
@@ -447,11 +472,6 @@ private int countNumRecords() throws Exception {
}
return ret;
}
-
- @Override
- public void cancel() {
- isRunning = false;
- }
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index 61a03a29a21b..b1e0fb83610e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -33,12 +33,17 @@
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataTypes;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
@@ -46,6 +51,7 @@
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -58,11 +64,13 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.paimon.CoreOptions.CONSUMER_ID;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link MonitorFunction} and {@link ReadOperator}. */
+/** Test for {@link MonitorSource} and {@link ReadOperator}. */
public class OperatorSourceTest {
@TempDir Path tempDir;
@@ -114,28 +122,39 @@ private List> readSplit(Split split) throws IOException {
}
@Test
- public void testMonitorFunction() throws Exception {
+ public void testMonitorSource() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. run first
OperatorSubtaskState snapshot;
{
- MonitorFunction function = new MonitorFunction(table.newReadBuilder(), 10, false);
- StreamSource src = new StreamSource<>(function);
+ MonitorSource source = new MonitorSource(table.newReadBuilder(), 10, false);
+ TestingSourceOperator operator =
+ (TestingSourceOperator)
+ TestingSourceOperator.createTestOperator(
+ source.createReader(null),
+ WatermarkStrategy.noWatermarks(),
+ false);
AbstractStreamOperatorTestHarness testHarness =
- new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+ new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0);
testHarness.open();
- snapshot = testReadSplit(function, () -> testHarness.snapshot(0, 0), 1, 1, 1);
+ snapshot = testReadSplit(operator, () -> testHarness.snapshot(0, 0), 1, 1, 1);
}
// 2. restore from state
{
- MonitorFunction functionCopy1 = new MonitorFunction(table.newReadBuilder(), 10, false);
- StreamSource srcCopy1 = new StreamSource<>(functionCopy1);
+ MonitorSource sourceCopy1 = new MonitorSource(table.newReadBuilder(), 10, false);
+ TestingSourceOperator operatorCopy1 =
+ (TestingSourceOperator)
+ TestingSourceOperator.createTestOperator(
+ sourceCopy1.createReader(null),
+ WatermarkStrategy.noWatermarks(),
+ false);
AbstractStreamOperatorTestHarness testHarnessCopy1 =
- new AbstractStreamOperatorTestHarness<>(srcCopy1, 1, 1, 0);
+ new AbstractStreamOperatorTestHarness<>(operatorCopy1, 1, 1, 0);
testHarnessCopy1.initializeState(snapshot);
testHarnessCopy1.open();
testReadSplit(
- functionCopy1,
+ operatorCopy1,
() -> {
testHarnessCopy1.snapshot(1, 1);
testHarnessCopy1.notifyOfCompletedCheckpoint(1);
@@ -148,12 +167,17 @@ public void testMonitorFunction() throws Exception {
// 3. restore from consumer id
{
- MonitorFunction functionCopy2 = new MonitorFunction(table.newReadBuilder(), 10, false);
- StreamSource srcCopy2 = new StreamSource<>(functionCopy2);
+ MonitorSource sourceCopy2 = new MonitorSource(table.newReadBuilder(), 10, false);
+ TestingSourceOperator operatorCopy2 =
+ (TestingSourceOperator)
+ TestingSourceOperator.createTestOperator(
+ sourceCopy2.createReader(null),
+ WatermarkStrategy.noWatermarks(),
+ false);
AbstractStreamOperatorTestHarness testHarnessCopy2 =
- new AbstractStreamOperatorTestHarness<>(srcCopy2, 1, 1, 0);
+ new AbstractStreamOperatorTestHarness<>(operatorCopy2, 1, 1, 0);
testHarnessCopy2.open();
- testReadSplit(functionCopy2, () -> null, 3, 3, 3);
+ testReadSplit(operatorCopy2, () -> null, 3, 3, 3);
}
}
@@ -231,7 +255,7 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
}
private T testReadSplit(
- MonitorFunction function,
+ SourceOperator operator,
SupplierWithException beforeClose,
int a,
int b,
@@ -239,20 +263,36 @@ private T testReadSplit(
throws Exception {
Throwable[] error = new Throwable[1];
ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10);
+ AtomicReference> iteratorRef = new AtomicReference<>();
- DummySourceContext sourceContext =
- new DummySourceContext() {
+ PushingAsyncDataInput.DataOutput output =
+ new PushingAsyncDataInput.DataOutput() {
@Override
- public void collect(Split element) {
- queue.add(element);
+ public void emitRecord(StreamRecord streamRecord) {
+ queue.add(streamRecord.getValue());
}
+
+ @Override
+ public void emitWatermark(Watermark watermark) {}
+
+ @Override
+ public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+ @Override
+ public void emitRecordAttributes(RecordAttributes recordAttributes) {}
};
+ AtomicBoolean isRunning = new AtomicBoolean(true);
Thread runner =
new Thread(
() -> {
try {
- function.run(sourceContext);
+ while (isRunning.get()) {
+ operator.emitNext(output);
+ }
} catch (Throwable t) {
t.printStackTrace();
error[0] = t;
@@ -266,34 +306,15 @@ public void collect(Split element) {
assertThat(readSplit(split)).containsExactlyInAnyOrder(Arrays.asList(a, b, c));
T t = beforeClose.get();
- function.cancel();
+ CloseableIterator iterator = iteratorRef.get();
+ if (iterator != null) {
+ iterator.close();
+ }
+ isRunning.set(false);
runner.join();
assertThat(error[0]).isNull();
return t;
}
-
- private abstract static class DummySourceContext
- implements SourceFunction.SourceContext {
-
- private final Object lock = new Object();
-
- @Override
- public void collectWithTimestamp(Split element, long timestamp) {}
-
- @Override
- public void emitWatermark(Watermark mark) {}
-
- @Override
- public void markAsTemporarilyIdle() {}
-
- @Override
- public Object getCheckpointLock() {
- return lock;
- }
-
- @Override
- public void close() {}
- }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
new file mode 100644
index 000000000000..77b44d5b0e5c
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.flink.source.SimpleSourceSplit;
+import org.apache.paimon.flink.source.SimpleSourceSplitSerializer;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * A SourceOperator extension to simplify test setup.
+ *
+ * This class is implemented in reference to {@link
+ * org.apache.flink.streaming.api.operators.source.TestingSourceOperator}.
+ *
+ *
See Flink
+ * PR that introduced this class
+ */
+public class TestingSourceOperator extends SourceOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int subtaskIndex;
+ private final int parallelism;
+
+ public TestingSourceOperator(
+ StreamOperatorParameters parameters,
+ SourceReader reader,
+ WatermarkStrategy watermarkStrategy,
+ ProcessingTimeService timeService,
+ boolean emitProgressiveWatermarks) {
+
+ this(
+ parameters,
+ reader,
+ watermarkStrategy,
+ timeService,
+ new TestingOperatorEventGateway(),
+ 1,
+ 5,
+ emitProgressiveWatermarks);
+ }
+
+ public TestingSourceOperator(
+ StreamOperatorParameters parameters,
+ SourceReader reader,
+ WatermarkStrategy watermarkStrategy,
+ ProcessingTimeService timeService,
+ OperatorEventGateway eventGateway,
+ int subtaskIndex,
+ int parallelism,
+ boolean emitProgressiveWatermarks) {
+
+ super(
+ (context) -> reader,
+ eventGateway,
+ new SimpleSourceSplitSerializer(),
+ watermarkStrategy,
+ timeService,
+ new Configuration(),
+ "localhost",
+ emitProgressiveWatermarks,
+ () -> false);
+
+ this.subtaskIndex = subtaskIndex;
+ this.parallelism = parallelism;
+ this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+ initSourceMetricGroup();
+
+ // unchecked wrapping is okay to keep tests simpler
+ try {
+ initReader();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+ }
+
+ @Override
+ public StreamingRuntimeContext getRuntimeContext() {
+ return new MockStreamingRuntimeContext(false, parallelism, subtaskIndex);
+ }
+
+ // this is overridden to avoid complex mock injection through the "containingTask"
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ ExecutionConfig cfg = new ExecutionConfig();
+ cfg.setAutoWatermarkInterval(100);
+ return cfg;
+ }
+
+ public static SourceOperator createTestOperator(
+ SourceReader reader,
+ WatermarkStrategy