diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java new file mode 100644 index 000000000000..2f9e69fe4f70 --- /dev/null +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; + +public class FlinkSourceCompat { + + public static Counter getNumRecordsInCounter(SourceReaderContext context) { + return ((OperatorMetricGroup) context.metricGroup()) + .getIOMetricGroup() + .getNumRecordsInCounter(); + } +} diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java new file mode 100644 index 000000000000..d6bed940470d --- /dev/null +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; + +public interface SplitEnumeratorCompat + extends SplitEnumerator { + + CheckpointT snapshotState(long checkpointId) throws Exception; +} diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java new file mode 100644 index 000000000000..08bba20e576e --- /dev/null +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Classes helping maintain backwards compatibility across Flink versions. */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; diff --git a/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java new file mode 100644 index 000000000000..1ddc2a957b7d --- /dev/null +++ b/runners/flink/1.12/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; + +public class SourceTestCompat { + + /** A MetricGroup implementation which records the registered gauge. */ + public static class TestMetricGroup + extends UnregisteredMetricGroups.UnregisteredOperatorMetricGroup { + public final Map> registeredGauge = new HashMap<>(); + public final Counter numRecordsInCounter = new SimpleCounter(); + + @Override + public > GaugeT gauge(String name, GaugeT gauge) { + registeredGauge.put(name, gauge); + return gauge; + } + + @Override + public OperatorIOMetricGroup getIOMetricGroup() { + return new OperatorIOMetricGroup(this) { + @Override + public Counter getNumRecordsInCounter() { + return numRecordsInCounter; + } + }; + } + } + + public interface ReaderOutputCompat extends ReaderOutput { + void markActive(); + } + + public interface SourceOutputCompat extends SourceOutput { + void markActive(); + } +} diff --git a/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java b/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java new file mode 100644 index 000000000000..06fdd781fc5c --- /dev/null +++ b/runners/flink/1.13/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/SplitEnumeratorCompat.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; + +public interface SplitEnumeratorCompat + extends SplitEnumerator { + + CheckpointT snapshotState() throws Exception; +} diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java new file mode 100644 index 000000000000..f68ae75d38e5 --- /dev/null +++ b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/compat/FlinkSourceCompat.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.metrics.Counter; + +public class FlinkSourceCompat { + + public static Counter getNumRecordsInCounter(SourceReaderContext context) { + return context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); + } +} diff --git a/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java new file mode 100644 index 000000000000..62b16eedca0b --- /dev/null +++ b/runners/flink/1.14/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/SourceTestCompat.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +public class SourceTestCompat { + + /** A MetricGroup implementation which records the registered gauge. */ + public static class TestMetricGroup extends UnregisteredMetricsGroup + implements SourceReaderMetricGroup { + public final Map> registeredGauge = new HashMap<>(); + public final Counter numRecordsInCounter = new SimpleCounter(); + + @Override + public OperatorIOMetricGroup getIOMetricGroup() { + return new UnregisteredOperatorIOMetricGroup() { + @Override + public Counter getNumRecordsInCounter() { + return numRecordsInCounter; + } + }; + } + + @Override + public > GaugeT gauge(String name, GaugeT gauge) { + registeredGauge.put(name, gauge); + return gauge; + } + + @Override + public Counter getNumRecordsInErrorsCounter() { + return new SimpleCounter(); + } + + @Override + public void setPendingBytesGauge(Gauge pendingBytesGauge) {} + + @Override + public void setPendingRecordsGauge(Gauge pendingRecordsGauge) {} + } + + private static class UnregisteredOperatorIOMetricGroup extends UnregisteredMetricsGroup + implements OperatorIOMetricGroup { + @Override + public Counter getNumRecordsInCounter() { + return new SimpleCounter(); + } + + @Override + public Counter getNumRecordsOutCounter() { + return new SimpleCounter(); + } + + @Override + public Counter getNumBytesInCounter() { + return new SimpleCounter(); + } + + @Override + public Counter getNumBytesOutCounter() { + return new SimpleCounter(); + } + } + + public interface ReaderOutputCompat extends ReaderOutput {} + + public interface SourceOutputCompat extends SourceOutput {} +} diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index ccd4f75d3b7c..cdfe921c1b2e 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -179,6 +179,7 @@ dependencies { if (flink_version.compareTo("1.14") >= 0) { implementation "org.apache.flink:flink-runtime:$flink_version" implementation "org.apache.flink:flink-optimizer:$flink_version" + implementation "org.apache.flink:flink-metrics-core:$flink_version" testImplementation "org.apache.flink:flink-runtime:$flink_version:tests" testImplementation "org.apache.flink:flink-rpc-akka:$flink_version" } else { @@ -197,6 +198,7 @@ dependencies { testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation library.java.jackson_dataformat_yaml testImplementation "org.apache.flink:flink-core:$flink_version:tests" + testImplementation "org.apache.flink:flink-connector-test-utils:$flink_version" testImplementation project(":sdks:java:harness") testRuntimeOnly library.java.slf4j_simple validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java new file mode 100644 index 000000000000..c502faeb7a63 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerdeUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.utils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** Util methods to help with serialization / deserialization. */ +public class SerdeUtils { + + // Private constructor for a util class. + private SerdeUtils() {} + + public static @Nonnull byte[] serializeObject(@Nullable Object obj) throws IOException { + if (obj == null) { + return new byte[0]; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(obj); + oos.close(); + return baos.toByteArray(); + } + + @SuppressWarnings("unchecked") + public static @Nullable Object deserializeObject(byte[] serialized) throws IOException { + if (serialized == null || serialized.length == 0) { + return null; + } + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + ObjectInputStream ois = new ObjectInputStream(bais)) { + return ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + public static SimpleVersionedSerializer getNaiveObjectSerializer() { + return new SimpleVersionedSerializer() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(T obj) throws IOException { + return serializeObject(obj); + } + + @Override + @SuppressWarnings("unchecked") + public T deserialize(int version, byte[] serialized) throws IOException { + if (version > getVersion()) { + throw new IOException( + String.format( + "Received serialized object of version %d, which is higher than " + + "the highest supported version %d.", + version, getVersion())); + } + return (T) deserializeObject(serialized); + } + }; + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java new file mode 100644 index 000000000000..c001b263340c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.utils.SerdeUtils; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse.BeamImpulseSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** + * The base class for {@link FlinkBoundedSource} and {@link FlinkUnboundedSource}. + * + * @param The data type of the records emitted by the raw Beam sources. + * @param The data type of the records emitted by the Flink Source. + */ +public abstract class FlinkSource + implements Source, Map>>> { + protected final org.apache.beam.sdk.io.Source beamSource; + protected final Boundedness boundedness; + protected final SerializablePipelineOptions serializablePipelineOptions; + + private final int numSplits; + + // ----------------- public static methods to construct sources -------------------- + + public static FlinkBoundedSource bounded( + BoundedSource boundedSource, + SerializablePipelineOptions serializablePipelineOptions, + int numSplits) { + return new FlinkBoundedSource<>( + boundedSource, serializablePipelineOptions, Boundedness.BOUNDED, numSplits); + } + + public static FlinkUnboundedSource unbounded( + UnboundedSource source, + SerializablePipelineOptions serializablePipelineOptions, + int numSplits) { + return new FlinkUnboundedSource<>(source, serializablePipelineOptions, numSplits); + } + + public static FlinkBoundedSource unboundedImpulse(long shutdownSourceAfterIdleMs) { + FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults(); + flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs); + // Here we wrap the BeamImpulseSource with a FlinkBoundedSource, but overriding its + // boundedness to CONTINUOUS_UNBOUNDED. By doing so, the Flink engine will treat this + // source as an unbounded source and execute the job in streaming mode. This also + // works well with checkpoint, because the FlinkSourceSplit containing the + // BeamImpulseSource will be discarded after the impulse emission. So the streaming + // job won't see another impulse after failover. + return new FlinkBoundedSource<>( + new BeamImpulseSource(), + new SerializablePipelineOptions(flinkPipelineOptions), + Boundedness.CONTINUOUS_UNBOUNDED, + 1, + record -> Watermark.MAX_WATERMARK.getTimestamp()); + } + + public static FlinkBoundedSource boundedImpulse() { + return new FlinkBoundedSource<>( + new BeamImpulseSource(), + new SerializablePipelineOptions(FlinkPipelineOptions.defaults()), + Boundedness.BOUNDED, + 1, + record -> Watermark.MAX_WATERMARK.getTimestamp()); + } + + // ------ Common implementations for both bounded and unbounded source --------- + + protected FlinkSource( + org.apache.beam.sdk.io.Source beamSource, + SerializablePipelineOptions serializablePipelineOptions, + Boundedness boundedness, + int numSplits) { + this.beamSource = beamSource; + this.serializablePipelineOptions = serializablePipelineOptions; + this.boundedness = boundedness; + this.numSplits = numSplits; + } + + @Override + public Boundedness getBoundedness() { + return boundedness; + } + + @Override + public SplitEnumerator, Map>>> + createEnumerator(SplitEnumeratorContext> enumContext) throws Exception { + return new FlinkSourceSplitEnumerator<>( + enumContext, beamSource, serializablePipelineOptions.get(), numSplits); + } + + @Override + public SplitEnumerator, Map>>> + restoreEnumerator( + SplitEnumeratorContext> enumContext, + Map>> checkpoint) + throws Exception { + FlinkSourceSplitEnumerator enumerator = + new FlinkSourceSplitEnumerator<>( + enumContext, beamSource, serializablePipelineOptions.get(), numSplits); + checkpoint.forEach( + (subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId)); + return enumerator; + } + + @Override + public SimpleVersionedSerializer> getSplitSerializer() { + return FlinkSourceSplit.serializer(); + } + + @Override + public SimpleVersionedSerializer>>> + getEnumeratorCheckpointSerializer() { + return SerdeUtils.getNaiveObjectSerializer(); + } + + public int getNumSplits() { + return numSplits; + } + + @FunctionalInterface + public interface TimestampExtractor extends Function, Serializable {} +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java new file mode 100644 index 000000000000..27b84910ac27 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.metrics.Counter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract implementation of {@link SourceReader} which encapsulates {@link Source Beam Sources} + * for data reading. + * + *
    + *
  1. Idle timeout support. + *
  2. Splits addition handling. + *
  3. Split reader creation and management. + *
  4. checkpoint management + *
+ * + *

This implementation provides unified logic for both {@link BoundedSource} and {@link + * UnboundedSource}. The subclasses are expected to only implement the {@link + * #pollNext(ReaderOutput)} method. + * + * @param the output element type from the encapsulated {@link Source Beam sources.} + */ +public abstract class FlinkSourceReaderBase + implements SourceReader> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class); + protected static final CompletableFuture AVAILABLE_NOW = + CompletableFuture.completedFuture(null); + // Some dummy instances to make the annotation checker happy with AtomicReference. + protected static final CompletableFuture DUMMY_FUTURE = new CompletableFuture<>(); + protected static final Exception NO_EXCEPTION = new Exception(); + + protected final PipelineOptions pipelineOptions; + protected final @Nullable Function timestampExtractor; + private final Queue> sourceSplits = new ArrayDeque<>(); + // This needs to be a ConcurrentHashMap because the metric retrieving thread may access it. + private final ConcurrentMap beamSourceReaders; + protected final SourceReaderContext context; + private final ScheduledExecutorService executor; + + protected final Counter numRecordsInCounter; + protected final long idleTimeoutMs; + private final CompletableFuture idleTimeoutFuture; + private final AtomicReference exception; + private boolean idleTimeoutCountingDown; + private CompletableFuture waitingForSplitChangeFuture; + private boolean noMoreSplits; + + protected FlinkSourceReaderBase( + SourceReaderContext context, + PipelineOptions pipelineOptions, + @Nullable Function timestampExtractor) { + this( + Executors.newSingleThreadScheduledExecutor( + r -> new Thread(r, "FlinkSource-Executor-Thread-" + context.getIndexOfSubtask())), + context, + pipelineOptions, + timestampExtractor); + } + + protected FlinkSourceReaderBase( + ScheduledExecutorService executor, + SourceReaderContext context, + PipelineOptions pipelineOptions, + @Nullable Function timestampExtractor) { + this.context = context; + this.pipelineOptions = pipelineOptions; + this.timestampExtractor = timestampExtractor; + this.beamSourceReaders = new ConcurrentHashMap<>(); + this.exception = new AtomicReference<>(NO_EXCEPTION); + this.executor = executor; + this.idleTimeoutMs = + pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs(); + this.idleTimeoutFuture = new CompletableFuture<>(); + this.waitingForSplitChangeFuture = new CompletableFuture<>(); + this.idleTimeoutCountingDown = false; + // TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is + // upgraded to 1.14 and above. + this.numRecordsInCounter = FlinkSourceCompat.getNumRecordsInCounter(context); + } + + @Override + public void start() {} + + @Override + public List> snapshotState(long checkpointId) { + checkExceptionAndMaybeThrow(); + // Add all the source splits whose readers haven't been created. + List> splitsState = new ArrayList<>(sourceSplits); + + // Add all the source splits being actively read. + beamSourceReaders.forEach( + (splitId, readerAndOutput) -> { + Source.Reader reader = readerAndOutput.reader; + if (reader instanceof BoundedSource.BoundedReader) { + // Sometimes users may decide to run a bounded source in streaming mode as "finite + // stream." + // For bounded source, the checkpoint granularity is the entire source split. + // So, in case of failure, all the data from this split will be consumed again. + splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource())); + } else if (reader instanceof UnboundedSource.UnboundedReader) { + // The checkpoint for unbounded sources is fine granular. + byte[] checkpointState = + getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader) reader); + splitsState.add( + new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState)); + } + }); + return splitsState; + } + + @Override + public CompletableFuture isAvailable() { + checkExceptionAndMaybeThrow(); + if (!sourceSplits.isEmpty() || !beamSourceReaders.isEmpty()) { + // There are still live readers. + CompletableFuture aliveReaderAvailableFuture = isAvailableForAliveReaders(); + // Regardless of whether there is data available from the alive readers, the + // main thread needs to be woken up if there is a split change. Hence, we + // need to combine the data available future with the split change future. + if (waitingForSplitChangeFuture.isDone()) { + waitingForSplitChangeFuture = new CompletableFuture<>(); + } + return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture) + .thenAccept(ignored -> {}); + } else if (noMoreSplits) { + // All the splits have been read, wait for idle timeout. + checkIdleTimeoutAndMaybeStartCountdown(); + return idleTimeoutFuture; + } else { + // There is no live readers, waiting for new split assignments or no more splits notification. + if (waitingForSplitChangeFuture.isDone()) { + waitingForSplitChangeFuture = new CompletableFuture<>(); + } + return waitingForSplitChangeFuture; + } + } + + @Override + public void notifyNoMoreSplits() { + checkExceptionAndMaybeThrow(); + LOG.info("Received NoMoreSplits signal from enumerator."); + noMoreSplits = true; + waitingForSplitChangeFuture.complete(null); + } + + @Override + public void addSplits(List> splits) { + checkExceptionAndMaybeThrow(); + LOG.info("Adding splits {}", splits); + sourceSplits.addAll(splits); + waitingForSplitChangeFuture.complete(null); + } + + @Override + public void close() throws Exception { + for (ReaderAndOutput readerAndOutput : beamSourceReaders.values()) { + readerAndOutput.reader.close(); + } + executor.shutdown(); + } + + // ----------------- protected abstract methods ---------------------- + + /** + * This method needs to be overridden by subclasses to determine if data is available when there + * are alive readers. For example, an unbounded source may not have any source split ready for + * data emission even if all the sources are still alive. Whereas for the bounded source, data is + * always available as long as there are alive readers. + */ + protected abstract CompletableFuture isAvailableForAliveReaders(); + + // ----------------- protected helper methods for subclasses -------------------- + + protected Optional createAndTrackNextReader() throws IOException { + FlinkSourceSplit sourceSplit = sourceSplits.poll(); + if (sourceSplit != null) { + Source.Reader reader = createReader(sourceSplit); + ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false); + beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput); + return Optional.of(readerAndOutput); + } + return Optional.empty(); + } + + protected void finishSplit(int splitIndex) throws IOException { + ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex); + if (readerAndOutput != null) { + LOG.info("Finished reading from split {}", readerAndOutput.splitId); + readerAndOutput.reader.close(); + } else { + throw new IllegalStateException( + "SourceReader for split " + splitIndex + " should never be null!"); + } + } + + protected boolean checkIdleTimeoutAndMaybeStartCountdown() { + if (idleTimeoutMs <= 0) { + idleTimeoutFuture.complete(null); + } else if (!idleTimeoutCountingDown) { + scheduleTask(() -> idleTimeoutFuture.complete(null), idleTimeoutMs); + idleTimeoutCountingDown = true; + } + return idleTimeoutFuture.isDone(); + } + + protected boolean noMoreSplits() { + return noMoreSplits; + } + + protected void scheduleTask(Runnable runnable, long delayMs) { + ignoreReturnValue( + executor.schedule(new ErrorRecordingRunnable(runnable), delayMs, TimeUnit.MILLISECONDS)); + } + + protected void scheduleTaskAtFixedRate(Runnable runnable, long delayMs, long periodMs) { + ignoreReturnValue( + executor.scheduleAtFixedRate( + new ErrorRecordingRunnable(runnable), delayMs, periodMs, TimeUnit.MILLISECONDS)); + } + + protected void execute(Runnable runnable) { + executor.execute(new ErrorRecordingRunnable(runnable)); + } + + protected void recordException(Throwable e) { + if (!exception.compareAndSet(NO_EXCEPTION, e)) { + exception.get().addSuppressed(e); + } + } + + protected void checkExceptionAndMaybeThrow() { + if (exception.get() != NO_EXCEPTION) { + throw new RuntimeException("The source reader received exception.", exception.get()); + } + } + + protected boolean hasException() { + return exception.get() != NO_EXCEPTION; + } + + protected Collection> sourceSplits() { + return Collections.unmodifiableCollection(sourceSplits); + } + + protected Map allReaders() { + return Collections.unmodifiableMap(beamSourceReaders); + } + + protected static void ignoreReturnValue(Object o) { + // do nothing. + } + // ------------------------------ private methods ------------------------------ + + @SuppressWarnings("unchecked") + private + byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader reader) { + UnboundedSource source = + (UnboundedSource) reader.getCurrentSource(); + CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark(); + Coder coder = source.getCheckpointMarkCoder(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + coder.encode(checkpointMark, baos); + return baos.toByteArray(); + } catch (IOException ioe) { + throw new RuntimeException("Failed to encode checkpoint mark.", ioe); + } + } + + private Source.Reader createReader(@Nonnull FlinkSourceSplit sourceSplit) + throws IOException { + Source beamSource = sourceSplit.getBeamSplitSource(); + if (beamSource instanceof BoundedSource) { + return ((BoundedSource) beamSource).createReader(pipelineOptions); + } else if (beamSource instanceof UnboundedSource) { + return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState()); + } else { + throw new IllegalStateException("Unknown source type " + beamSource.getClass()); + } + } + + private + Source.Reader createUnboundedSourceReader( + Source beamSource, @Nullable byte[] splitState) throws IOException { + UnboundedSource unboundedSource = + (UnboundedSource) beamSource; + Coder coder = unboundedSource.getCheckpointMarkCoder(); + if (splitState == null) { + return unboundedSource.createReader(pipelineOptions, null); + } else { + try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) { + return unboundedSource.createReader(pipelineOptions, coder.decode(bais)); + } + } + } + + // -------------------- protected helper class --------------------- + + /** A wrapper for the reader and its associated information. */ + protected final class ReaderAndOutput { + public final String splitId; + public final Source.Reader reader; + private boolean started; + private @Nullable SourceOutput outputForSplit; + + public ReaderAndOutput(String splitId, Source.Reader reader, boolean started) { + this.splitId = splitId; + this.reader = reader; + this.started = started; + this.outputForSplit = null; + } + + public SourceOutput getAndMaybeCreateSplitOutput(ReaderOutput output) { + if (outputForSplit == null) { + outputForSplit = output.createOutputForSplit(splitId); + } + return outputForSplit; + } + + public boolean startOrAdvance() throws IOException { + if (started) { + return reader.advance(); + } else { + started = true; + return reader.start(); + } + } + + public @Nullable SourceOutput sourceOutput() { + return outputForSplit; + } + } + + private final class ErrorRecordingRunnable implements Runnable { + private final Runnable runnable; + + ErrorRecordingRunnable(Runnable r) { + this.runnable = r; + } + + @Override + public void run() { + try { + runnable.run(); + } catch (Throwable t) { + recordException(t); + } + } + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java new file mode 100644 index 000000000000..32fcd23344d9 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.runners.flink.translation.utils.SerdeUtils; +import org.apache.beam.sdk.io.Source; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** + * A Flink {@link SourceSplit} implementation that encapsulates a Beam {@link Source}. This class + * also serves as the holder of the checkpoint state of the Beam {@link + * org.apache.beam.sdk.io.Source.Reader Reader} created from the encapsulated source. So, the Source + * can recreate the Reader from the checkpointed state upon failure recovery. + * + * @param The output type of the encapsulated Beam {@link Source}. + */ +public class FlinkSourceSplit implements SourceSplit, Serializable { + // The index of the split. + private final int splitIndex; + private final Source beamSplitSource; + private final @Nullable byte[] splitState; + + public FlinkSourceSplit(int splitIndex, Source beamSplitSource) { + this(splitIndex, beamSplitSource, null); + } + + public FlinkSourceSplit(int splitIndex, Source beamSplitSource, @Nullable byte[] splitState) { + this.splitIndex = splitIndex; + this.beamSplitSource = beamSplitSource; + this.splitState = splitState; + } + + public int splitIndex() { + return splitIndex; + } + + public @Nullable byte[] getSplitState() { + return splitState; + } + + public Source getBeamSplitSource() { + return beamSplitSource; + } + + @Override + public String splitId() { + return Integer.toString(splitIndex); + } + + @Override + public String toString() { + return String.format("[SplitIndex: %d, BeamSource: %s]", splitIndex, beamSplitSource); + } + + public static SimpleVersionedSerializer> serializer() { + return SerdeUtils.getNaiveObjectSerializer(); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java new file mode 100644 index 000000000000..292697479bcd --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Flink {@link org.apache.flink.api.connector.source.SplitEnumerator SplitEnumerator} + * implementation that holds a Beam {@link Source} and does the following: + * + *

    + *
  • Split the Beam {@link Source} to desired number of splits. + *
  • Assign the splits to the Flink Source Reader. + *
+ * + *

Note that at this point, this class has a static round-robin split assignment strategy. + * + * @param The output type of the encapsulated Beam {@link Source}. + */ +public class FlinkSourceSplitEnumerator + implements SplitEnumeratorCompat, Map>>> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class); + private final SplitEnumeratorContext> context; + private final Source beamSource; + private final PipelineOptions pipelineOptions; + private final int numSplits; + private final Map>> pendingSplits; + private boolean splitsInitialized; + + public FlinkSourceSplitEnumerator( + SplitEnumeratorContext> context, + Source beamSource, + PipelineOptions pipelineOptions, + int numSplits) { + this.context = context; + this.beamSource = beamSource; + this.pipelineOptions = pipelineOptions; + this.numSplits = numSplits; + this.pendingSplits = new HashMap<>(numSplits); + this.splitsInitialized = false; + } + + @Override + public void start() { + context.callAsync( + () -> { + try { + List> beamSplitSourceList = splitBeamSource(); + Map>> flinkSourceSplitsList = new HashMap<>(); + int i = 0; + for (Source beamSplitSource : beamSplitSourceList) { + int targetSubtask = i % context.currentParallelism(); + List> splitsForTask = + flinkSourceSplitsList.computeIfAbsent( + targetSubtask, ignored -> new ArrayList<>()); + splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource)); + i++; + } + return flinkSourceSplitsList; + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + (sourceSplits, error) -> { + if (error != null) { + throw new RuntimeException("Failed to start source enumerator.", error); + } else { + pendingSplits.putAll(sourceSplits); + splitsInitialized = true; + sendPendingSplitsToSourceReaders(); + } + }); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + // Not used. + } + + @Override + public void addSplitsBack(List> splits, int subtaskId) { + LOG.info("Adding splits {} back from subtask {}", splits, subtaskId); + List> splitsForSubtask = + pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>()); + splitsForSubtask.addAll(splits); + } + + @Override + public void addReader(int subtaskId) { + List> splitsForSubtask = pendingSplits.remove(subtaskId); + if (splitsForSubtask != null) { + assignSplitsAndLog(splitsForSubtask, subtaskId); + pendingSplits.remove(subtaskId); + } else { + if (splitsInitialized) { + LOG.info("There is no split for subtask {}. Signaling no more splits.", subtaskId); + context.signalNoMoreSplits(subtaskId); + } + } + } + + @Override + public Map>> snapshotState(long checkpointId) throws Exception { + LOG.info("Taking snapshot for checkpoint {}", checkpointId); + return snapshotState(); + } + + @Override + public Map>> snapshotState() throws Exception { + return pendingSplits; + } + + @Override + public void close() throws IOException { + // NoOp + } + + // -------------- Private helper methods ---------------------- + private List> splitBeamSource() throws Exception { + if (beamSource instanceof BoundedSource) { + BoundedSource boundedSource = (BoundedSource) beamSource; + long desiredSizeBytes = boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits; + return boundedSource.split(desiredSizeBytes, pipelineOptions); + } else if (beamSource instanceof UnboundedSource) { + return ((UnboundedSource) beamSource).split(numSplits, pipelineOptions); + } else { + throw new IllegalStateException("Unknown source type " + beamSource.getClass()); + } + } + + private void sendPendingSplitsToSourceReaders() { + Iterator>>> splitIter = + pendingSplits.entrySet().iterator(); + while (splitIter.hasNext()) { + Map.Entry>> entry = splitIter.next(); + int readerIndex = entry.getKey(); + int targetSubtask = readerIndex % context.currentParallelism(); + if (context.registeredReaders().containsKey(targetSubtask)) { + assignSplitsAndLog(entry.getValue(), targetSubtask); + splitIter.remove(); + } + } + } + + private void assignSplitsAndLog(List> splits, int subtaskId) { + context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtaskId, splits))); + context.signalNoMoreSplits(subtaskId); + LOG.info("Assigned splits {} to subtask {}", splits, subtaskId); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java new file mode 100644 index 000000000000..c2bd904dcc60 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded; + +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; + +/** + * A Flink {@link org.apache.flink.api.connector.source.Source Source} implementation that wraps a + * Beam {@link BoundedSource BoundedSource}. + * + *

A {@link FlinkBoundedSource} can run in either batch or streaming mode, depending on its + * {@link Boundedness} setting. For a BoundedSource running in streaming mode, it is acting like a + * "finite stream". + * + * @param The output type of the wrapped Beam {@link BoundedSource BoundedSource}. + */ +public class FlinkBoundedSource extends FlinkSource> { + protected final @Nullable TimestampExtractor> timestampExtractor; + + public FlinkBoundedSource( + BoundedSource beamSource, + SerializablePipelineOptions serializablePipelineOptions, + Boundedness boundedness, + int numSplits) { + this(beamSource, serializablePipelineOptions, boundedness, numSplits, null); + } + + public FlinkBoundedSource( + BoundedSource beamSource, + SerializablePipelineOptions serializablePipelineOptions, + Boundedness boundedness, + int numSplits, + @Nullable TimestampExtractor> timestampExtractor) { + super(beamSource, serializablePipelineOptions, boundedness, numSplits); + this.timestampExtractor = timestampExtractor; + } + + @Override + public SourceReader, FlinkSourceSplit> createReader( + SourceReaderContext readerContext) throws Exception { + return new FlinkBoundedSourceReader<>( + readerContext, serializablePipelineOptions.get(), timestampExtractor); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java new file mode 100644 index 000000000000..9cea73f6a4a3 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Flink {@link org.apache.flink.api.connector.source.SourceReader SourceReader} implementation + * that reads from the assigned {@link + * org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit + * FlinkSourceSplits} by using Beam {@link org.apache.beam.sdk.io.BoundedSource.BoundedReader + * BoundedReaders}. + * + *

This reader consumes the source splits one by one sequentially, instead of concurrently. + * + * @param the output element type of the encapsulated Beam {@link + * org.apache.beam.sdk.io.BoundedSource.BoundedReader BoundedReader.} + */ +public class FlinkBoundedSourceReader extends FlinkSourceReaderBase> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class); + private @Nullable Source.Reader currentReader; + private int currentSplitId; + + public FlinkBoundedSourceReader( + SourceReaderContext context, + PipelineOptions pipelineOptions, + @Nullable Function, Long> timestampExtractor) { + super(context, pipelineOptions, timestampExtractor); + currentSplitId = -1; + } + + @VisibleForTesting + protected FlinkBoundedSourceReader( + SourceReaderContext context, + PipelineOptions pipelineOptions, + ScheduledExecutorService executor, + @Nullable Function, Long> timestampExtractor) { + super(executor, context, pipelineOptions, timestampExtractor); + currentSplitId = -1; + } + + @Override + public InputStatus pollNext(ReaderOutput> output) throws Exception { + checkExceptionAndMaybeThrow(); + if (currentReader == null && !moveToNextNonEmptyReader()) { + // Nothing to read for now. + if (noMoreSplits() && checkIdleTimeoutAndMaybeStartCountdown()) { + // All the source splits have been read and idle timeout has passed. + LOG.info( + "All splits have finished reading, and idle time {} ms has passed.", idleTimeoutMs); + return InputStatus.END_OF_INPUT; + } else { + // This reader either hasn't received NoMoreSplitsEvent yet or it is waiting for idle + // timeout. + return InputStatus.NOTHING_AVAILABLE; + } + } + Source.Reader tempCurrentReader = currentReader; + if (tempCurrentReader != null) { + T record = tempCurrentReader.getCurrent(); + WindowedValue windowedValue = + WindowedValue.of( + record, + tempCurrentReader.getCurrentTimestamp(), + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING); + if (timestampExtractor == null) { + output.collect(windowedValue); + } else { + output.collect(windowedValue, timestampExtractor.apply(windowedValue)); + } + numRecordsInCounter.inc(); + // If the advance() invocation throws exception here, the job will just fail over and read + // everything again from + // the beginning. So the failover granularity is the entire Flink job. + if (!tempCurrentReader.advance()) { + finishSplit(currentSplitId); + currentReader = null; + currentSplitId = -1; + LOG.debug("Finished reading from {}", currentSplitId); + } + // Always return MORE_AVAILABLE here regardless of the availability of next record. If there + // is no more + // records available in the current split, the next invocation of pollNext() will handle that. + return InputStatus.MORE_AVAILABLE; + } else { + throw new IllegalArgumentException( + "If we reach here, the current beam reader should not be null"); + } + } + + @Override + protected CompletableFuture isAvailableForAliveReaders() { + // For bounded source, as long as there are active readers, the data is available. + return AVAILABLE_NOW; + } + + // ------------------------- private helper methods -------------------------- + + private boolean moveToNextNonEmptyReader() throws IOException { + Optional readerAndOutput; + while ((readerAndOutput = createAndTrackNextReader()).isPresent()) { + ReaderAndOutput rao = readerAndOutput.get(); + if (rao.reader.start()) { + currentSplitId = Integer.parseInt(rao.splitId); + currentReader = rao.reader; + return true; + } else { + finishSplit(Integer.parseInt(rao.splitId)); + } + } + return false; + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java new file mode 100644 index 000000000000..7722241331d2 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** Internal implementation of the Beam runner for Apache Flink. */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java new file mode 100644 index 000000000000..cbf1871dfbaf --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.joda.time.Instant; + +/** A Beam {@link BoundedSource} for Impulse Source. */ +public class BeamImpulseSource extends BoundedSource { + + @Override + public List> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + // Always return a single split. + return Collections.singletonList(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 0; + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + return new ImpulseReader(this); + } + + private static class ImpulseReader extends BoundedSource.BoundedReader { + private final BeamImpulseSource source; + private boolean started; + private int index; + + private ImpulseReader(BeamImpulseSource source) { + this.source = source; + this.started = false; + this.index = 0; + } + + @Override + public boolean start() throws IOException { + started = true; + return true; + } + + @Override + public boolean advance() throws IOException { + if (!started) { + throw new IllegalStateException("start() should be called before calling advance()"); + } + index++; + return false; + } + + @Override + public byte[] getCurrent() throws NoSuchElementException { + if (!started) { + throw new IllegalStateException("The reader hasn't started."); + } + if (index == 0) { + return new byte[0]; + } else { + throw new NoSuchElementException("No element is available."); + } + } + + @Override + public BoundedSource getCurrentSource() { + return source; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (!started) { + throw new IllegalStateException("The reader hasn't started."); + } + if (index == 0) { + return BoundedWindow.TIMESTAMP_MIN_VALUE; + } else { + throw new NoSuchElementException("No element is available."); + } + } + + @Override + public void close() throws IOException {} + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java new file mode 100644 index 000000000000..3fd38e2259d1 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** Internal implementation of the Beam runner for Apache Flink. */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.impulse; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java new file mode 100644 index 000000000000..448b3fef6d21 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** Internal implementation of the Beam runner for Apache Flink. */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java new file mode 100644 index 000000000000..b40492201700 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSource.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded; + +import javax.annotation.Nullable; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; + +/** + * A Flink {@link org.apache.flink.api.connector.source.Source Source} implementation that wraps a + * Beam {@link org.apache.beam.sdk.io.UnboundedSource UnboundedSource}. + * + * @param The output type of the wrapped Beam {@link org.apache.beam.sdk.io.UnboundedSource + * UnboundedSource}. + */ +public class FlinkUnboundedSource extends FlinkSource>> { + private final @Nullable TimestampExtractor>> + timestampExtractor; + + public FlinkUnboundedSource( + UnboundedSource beamSource, + SerializablePipelineOptions serializablePipelineOptions, + int numSplits) { + this(beamSource, serializablePipelineOptions, numSplits, null); + } + + public FlinkUnboundedSource( + UnboundedSource beamSource, + SerializablePipelineOptions serializablePipelineOptions, + int numSplits, + @Nullable TimestampExtractor>> timestampExtractor) { + super(beamSource, serializablePipelineOptions, Boundedness.CONTINUOUS_UNBOUNDED, numSplits); + this.timestampExtractor = timestampExtractor; + } + + @Override + public SourceReader>, FlinkSourceSplit> createReader( + SourceReaderContext readerContext) throws Exception { + return new FlinkUnboundedSourceReader<>( + readerContext, serializablePipelineOptions.get(), timestampExtractor); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java new file mode 100644 index 000000000000..3c596360efd7 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Flink {@link org.apache.flink.api.connector.source.SourceReader SourceReader} implementation + * that reads from the assigned {@link + * org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit + * FlinkSourceSplits} by using Beam {@link org.apache.beam.sdk.io.UnboundedSource.UnboundedReader + * UnboundedReaders}. + * + *

This reader consumes all the assigned source splits concurrently. + * + * @param the output element type of the encapsulated Beam {@link + * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader UnboundedReader}. + */ +public class FlinkUnboundedSourceReader + extends FlinkSourceReaderBase>> { + private static final Logger LOG = LoggerFactory.getLogger(FlinkUnboundedSourceReader.class); + // This name is defined in FLIP-33. + @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes"; + private static final long SLEEP_ON_IDLE_MS = 50L; + private final AtomicReference> dataAvailableFutureRef; + private final List readers; + private int currentReaderIndex; + private volatile boolean shouldEmitWatermark; + + public FlinkUnboundedSourceReader( + SourceReaderContext context, + PipelineOptions pipelineOptions, + @Nullable Function>, Long> timestampExtractor) { + super(context, pipelineOptions, timestampExtractor); + this.readers = new ArrayList<>(); + this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE); + this.currentReaderIndex = 0; + } + + @VisibleForTesting + protected FlinkUnboundedSourceReader( + SourceReaderContext context, + PipelineOptions pipelineOptions, + ScheduledExecutorService executor, + @Nullable Function>, Long> timestampExtractor) { + super(executor, context, pipelineOptions, timestampExtractor); + this.readers = new ArrayList<>(); + this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE); + this.currentReaderIndex = 0; + } + + @Override + public void start() { + createPendingBytesGauge(context); + Long watermarkInterval = + pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval(); + if (watermarkInterval != null) { + scheduleTaskAtFixedRate( + () -> { + // Set the watermark emission flag first. + shouldEmitWatermark = true; + // Wake up the main thread if necessary. + CompletableFuture f = dataAvailableFutureRef.get(); + if (f != DUMMY_FUTURE) { + f.complete(null); + } + }, + watermarkInterval, + watermarkInterval); + } else { + LOG.warn("AutoWatermarkInterval is not set, watermarks won't be emitted."); + } + } + + @Override + public InputStatus pollNext(ReaderOutput>> output) + throws Exception { + checkExceptionAndMaybeThrow(); + maybeEmitWatermark(); + maybeCreateReaderForNewSplits(); + + ReaderAndOutput reader = nextReaderWithData(); + if (reader != null) { + emitRecord(reader, output); + return InputStatus.MORE_AVAILABLE; + } else { + LOG.trace("No data available for now."); + return InputStatus.NOTHING_AVAILABLE; + } + } + + /** + * Check whether there are data available from alive readers. If not, set a future and wait for + * the periodically running wake-up task to complete that future when the check interval passes. + * This method is only called by the main thread, which is the only thread writing to the future + * ref. Note that for UnboundedSource, because the splits never finishes, there are always alive + * readers after the first split assigment. Hence, the return value of {@link + * FlinkSourceReaderBase#isAvailable()} will effectively be determined by this method after the + * first split assignment. + */ + @Override + protected CompletableFuture isAvailableForAliveReaders() { + CompletableFuture future = dataAvailableFutureRef.get(); + if (future == DUMMY_FUTURE) { + CompletableFuture newFuture = new CompletableFuture<>(); + // Need to set the future first to avoid the race condition of missing the watermark emission + // notification. + dataAvailableFutureRef.set(newFuture); + if (shouldEmitWatermark || hasException()) { + // There are exception after we set the new future, + // immediately complete the future and return. + dataAvailableFutureRef.set(DUMMY_FUTURE); + newFuture.complete(null); + } else { + LOG.debug("There is no data available, scheduling the idle reader checker."); + scheduleTask( + () -> { + CompletableFuture f = dataAvailableFutureRef.get(); + if (f != DUMMY_FUTURE) { + f.complete(null); + } + }, + SLEEP_ON_IDLE_MS); + } + return newFuture; + } else if (future.isDone()) { + // The previous future is completed, just use it and reset the future ref. + dataAvailableFutureRef.getAndSet(DUMMY_FUTURE); + return future; + } else { + // The previous future has not been completed, just use it. + return future; + } + } + + // -------------- private helper methods ---------------- + + private void emitRecord( + ReaderAndOutput readerAndOutput, ReaderOutput>> output) { + UnboundedSource.UnboundedReader reader = asUnbounded(readerAndOutput.reader); + T item = reader.getCurrent(); + byte[] recordId = reader.getCurrentRecordId(); + Instant timestamp = reader.getCurrentTimestamp(); + + WindowedValue> windowedValue = + WindowedValue.of( + new ValueWithRecordId<>(item, recordId), + timestamp, + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING); + LOG.trace("Emitting record: {}", windowedValue); + if (timestampExtractor == null) { + readerAndOutput.getAndMaybeCreateSplitOutput(output).collect(windowedValue); + } else { + readerAndOutput + .getAndMaybeCreateSplitOutput(output) + .collect(windowedValue, timestampExtractor.apply(windowedValue)); + } + numRecordsInCounter.inc(); + } + + private void maybeEmitWatermark() { + // Here we rely on the Flink source watermark multiplexer to combine the per-split watermark. + // The runnable may emit more than one watermark when it runs. + if (shouldEmitWatermark) { + allReaders() + .values() + .forEach( + readerAndOutput -> { + SourceOutput sourceOutput = readerAndOutput.sourceOutput(); + if (sourceOutput != null) { + long watermark = asUnbounded(readerAndOutput.reader).getWatermark().getMillis(); + sourceOutput.emitWatermark(new Watermark(watermark)); + } + }); + shouldEmitWatermark = false; + } + } + + private void maybeCreateReaderForNewSplits() throws Exception { + while (!sourceSplits().isEmpty()) { + Optional readerAndOutputOpt = createAndTrackNextReader(); + if (readerAndOutputOpt.isPresent()) { + readers.add(readerAndOutputOpt.get()); + } else { + // Null splitId is only possible when exception occurs, just check exception to throw it. + checkExceptionAndMaybeThrow(); + } + } + } + + private @Nullable ReaderAndOutput nextReaderWithData() throws IOException { + int numReaders = readers.size(); + for (int i = 0; i < numReaders; i++) { + ReaderAndOutput readerAndOutput = readers.get(currentReaderIndex); + currentReaderIndex = (currentReaderIndex + 1) % numReaders; + if (readerAndOutput.startOrAdvance()) { + return readerAndOutput; + } + } + return null; + } + + private static UnboundedSource.UnboundedReader asUnbounded(Source.Reader reader) { + return (UnboundedSource.UnboundedReader) reader; + } + + private void createPendingBytesGauge(SourceReaderContext context) { + // TODO: Replace with SourceReaderContest.metricGroup().setPendingBytesGauge() after Flink 1.14 + // and above. + context + .metricGroup() + .gauge( + PENDING_BYTES_METRIC_NAME, + () -> { + long pendingBytes = -1L; + for (FlinkSourceReaderBase.ReaderAndOutput readerAndOutput : + allReaders().values()) { + long pendingBytesForReader = + asUnbounded(readerAndOutput.reader).getSplitBacklogBytes(); + if (pendingBytesForReader != -1L) { + pendingBytes = + pendingBytes == -1L + ? pendingBytesForReader + : pendingBytes + pendingBytesForReader; + } + } + return pendingBytes; + }); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java new file mode 100644 index 000000000000..0a853b66f4d9 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** Internal implementation of the Beam runner for Apache Flink. */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java new file mode 100644 index 000000000000..68438a701204 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestBoundedCountingSource.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.TestSource; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.values.KV; + +public class TestBoundedCountingSource extends BoundedSource> + implements TestSource { + private final int totalNumRecords; + private final TestCountingSource source; + private final List createdReaders; + private int nextValueForValidating; + private long nextTimestampForValidating; + + public TestBoundedCountingSource(int shardNum, int totalNumRecords) { + this.totalNumRecords = totalNumRecords; + this.source = + new TestCountingSource(totalNumRecords).withShardNumber(shardNum).withFixedNumSplits(1); + this.createdReaders = new ArrayList<>(); + this.nextValueForValidating = 0; + this.nextTimestampForValidating = 0; + } + + @Override + public List>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + List splits = new ArrayList<>(); + int numRecordsAssigned = 0; + int shardNum = 0; + while (numRecordsAssigned < totalNumRecords) { + int numRecordsForSplit = + (int) Math.min(totalNumRecords - numRecordsAssigned, desiredBundleSizeBytes); + splits.add(new TestBoundedCountingSource(shardNum, numRecordsForSplit)); + numRecordsAssigned += numRecordsForSplit; + shardNum++; + } + return splits; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return totalNumRecords; + } + + @Override + public BoundedReader> createReader(PipelineOptions options) + throws IOException { + BoundedTestCountingSourceReader reader = + new BoundedTestCountingSourceReader(source.createReader(options, null), this); + createdReaders.add(reader); + return reader; + } + + @Override + public List createdReaders() { + return createdReaders; + } + + @Override + public boolean validateNextValue(int value) { + boolean result = value == nextValueForValidating; + nextValueForValidating++; + return result; + } + + @Override + public boolean validateNextTimestamp(long timestamp) { + boolean result = timestamp == nextTimestampForValidating; + nextTimestampForValidating++; + return result; + } + + @Override + public boolean isConsumptionCompleted() { + return nextValueForValidating == totalNumRecords; + } + + @Override + public boolean allTimestampsReceived() { + return nextTimestampForValidating == nextValueForValidating; + } + + public static class BoundedTestCountingSourceReader + extends BoundedSource.BoundedReader> implements TestReader { + + private final TestCountingSource.CountingSourceReader reader; + private final TestBoundedCountingSource currentSource; + private boolean closed; + + private BoundedTestCountingSourceReader( + TestCountingSource.CountingSourceReader reader, TestBoundedCountingSource currentSource) { + this.reader = reader; + this.currentSource = currentSource; + this.closed = false; + } + + @Override + public boolean start() throws IOException { + return reader.start(); + } + + @Override + public boolean advance() throws IOException { + return reader.advance(); + } + + @Override + public KV getCurrent() throws NoSuchElementException { + return reader.getCurrent(); + } + + @Override + public void close() throws IOException { + closed = true; + reader.close(); + } + + @Override + public BoundedSource> getCurrentSource() { + return currentSource; + } + + @Override + public boolean isClosed() { + return closed; + } + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java index 41932dd1e498..5c54ce4c44e1 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.TestSource; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DelegateCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -45,7 +46,8 @@ * where not all the data is available immediately. */ public class TestCountingSource - extends UnboundedSource, TestCountingSource.CounterMark> { + extends UnboundedSource, TestCountingSource.CounterMark> + implements TestSource { private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class); private static List finalizeTracker; @@ -65,6 +67,12 @@ public class TestCountingSource */ private static boolean thrown = false; + private final List createdReaders; + + private int nextValueForValidating; + + private long nextTimestampForValidating; + public static void setFinalizeTracker(List finalizeTracker) { TestCountingSource.finalizeTracker = finalizeTracker; } @@ -77,7 +85,7 @@ public TestCountingSource withDedup() { return new TestCountingSource(numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, -1); } - private TestCountingSource withShardNumber(int shardNumber) { + public TestCountingSource withShardNumber(int shardNumber) { return new TestCountingSource( numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, -1); } @@ -107,6 +115,7 @@ private TestCountingSource( this.dedup = dedup; this.throwOnFirstSnapshot = throwOnFirstSnapshot; this.fixedNumSplits = fixedNumSplits; + this.createdReaders = new ArrayList<>(); } /** Halts emission of elements until {@code continueEmission} is invoked. */ @@ -129,7 +138,7 @@ public List split(int desiredNumSplits, PipelineOptions opti return splits; } - static class CounterMark implements UnboundedSource.CheckpointMark { + public static class CounterMark implements UnboundedSource.CheckpointMark { int current; public CounterMark(int current) { @@ -144,6 +153,35 @@ public void finalizeCheckpoint() { } } + @Override + public List createdReaders() { + return createdReaders; + } + + @Override + public boolean validateNextValue(int value) { + boolean result = value == nextValueForValidating; + nextValueForValidating++; + return result; + } + + @Override + public boolean validateNextTimestamp(long timestamp) { + boolean result = timestamp == nextTimestampForValidating; + nextTimestampForValidating++; + return result; + } + + @Override + public boolean isConsumptionCompleted() { + return nextValueForValidating == numMessagesPerShard; + } + + @Override + public boolean allTimestampsReceived() { + return nextTimestampForValidating == nextValueForValidating; + } + @Override public Coder getCheckpointMarkCoder() { return DelegateCoder.of(VarIntCoder.of(), new FromCounterMark(), new ToCounterMark()); @@ -158,11 +196,14 @@ public boolean requiresDeduping() { * Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to {@link * TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast. */ - public class CountingSourceReader extends UnboundedReader> { + public class CountingSourceReader extends UnboundedReader> + implements TestReader { private int current; + private boolean closed; public CountingSourceReader(int startingPoint) { this.current = startingPoint; + this.closed = false; } @Override @@ -203,7 +244,9 @@ public byte[] getCurrentRecordId() { } @Override - public void close() {} + public void close() { + closed = true; + } @Override public TestCountingSource getCurrentSource() { @@ -238,6 +281,11 @@ public CounterMark getCheckpointMark() { public long getSplitBacklogBytes() { return 7L; } + + @Override + public boolean isClosed() { + return closed; + } } @Override @@ -248,7 +296,10 @@ public CountingSourceReader createReader( } else { LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current); } - return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1); + CountingSourceReader reader = + new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1); + createdReaders.add(reader); + return reader; } @Override diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java new file mode 100644 index 000000000000..dcab3aff0f5b --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.junit.Test; +import org.mockito.Mockito; + +/** An abstract test base for {@link FlinkSourceReaderBase}. */ +public abstract class FlinkSourceReaderTestBase { + + // -------------- test poll -------------- + @Test(timeout = 30000L) + public void testPollBasic() throws Exception { + testPoll(5, 10); + } + + @Test(timeout = 30000L) + public void testPollFromEmptySplit() throws Exception { + testPoll(3, 0); + } + + @Test + public void testPollWithTimestampExtractor() throws Exception { + testPoll(5, 10, record -> getKVPairs(record).getValue().longValue()); + } + + @Test + public void testExceptionInExecutorThread() throws Exception { + try (SourceReader>> reader = createReader()) { + reader.start(); + ReaderOutput output = Mockito.mock(ReaderOutput.class); + // The first poll should not throw any exception. + reader.pollNext(output); + + RuntimeException expectedException = new RuntimeException(); + RuntimeException suppressedException = new RuntimeException(); + ((FlinkSourceReaderBase) reader) + .execute( + () -> { + throw expectedException; + }); + ((FlinkSourceReaderBase) reader) + .execute( + () -> { + throw suppressedException; + }); + CountDownLatch countDownLatch = new CountDownLatch(1); + ((FlinkSourceReaderBase) reader).execute(countDownLatch::countDown); + countDownLatch.await(); + + try { + reader.pollNext(output); + fail("Should have thrown exception here."); + } catch (Exception e) { + Throwable actualException = e; + while (actualException != expectedException && actualException.getCause() != null) { + actualException = actualException.getCause(); + } + assertEquals(expectedException, actualException); + assertEquals(1, actualException.getSuppressed().length); + assertEquals(suppressedException, actualException.getSuppressed()[0]); + } + } + } + + private void testPoll(int numSplits, int numRecordsPerSplit) throws Exception { + testPoll(numSplits, numRecordsPerSplit, null); + } + + private void testPoll( + int numSplits, int numRecordsPerSplit, @Nullable Function timestampExtractor) + throws Exception { + List>> splits = + createSplits(numSplits, numRecordsPerSplit, 0); + try (SourceReader>> reader = + createReader(timestampExtractor)) { + pollAndValidate(reader, splits, timestampExtractor != null); + } + verifyBeamReaderClosed(splits); + } + + // This test may fail if the subclass of FlinkSourceReaderBase overrides + // the isAvailable() method, which should have a good reason. + @Test + public void testIsAvailableOnNoMoreSplitsNotification() throws Exception { + try (SourceReader>> reader = createReader()) { + reader.start(); + + // No splits assigned yet. + CompletableFuture future1 = reader.isAvailable(); + assertFalse("No split assigned yet, should not be available.", future1.isDone()); + + // Data available on split assigned. + reader.notifyNoMoreSplits(); + assertTrue("Future1 should be completed upon no more splits notification", future1.isDone()); + assertTrue( + "Completed future should be returned so pollNext can be invoked to get updated INPUT_STATUS", + reader.isAvailable().isDone()); + } + } + + // This test may fail if the subclass of FlinkSourceReaderBase overrides + // the isAvailable() method, which should have a good reason. + @Test + public void testIsAvailableWithIdleTimeout() throws Exception { + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + try (SourceReader>> reader = + createReader(executor, 1L)) { + reader.start(); + + CompletableFuture future1 = reader.isAvailable(); + assertFalse("Future1 should be uncompleted without live split.", future1.isDone()); + + reader.notifyNoMoreSplits(); + assertTrue("Future1 should be completed upon no more splits notification.", future1.isDone()); + CompletableFuture future2 = reader.isAvailable(); + assertFalse("Future2 should be uncompleted when waiting for idle timeout", future2.isDone()); + + executor.triggerScheduledTasks(); + assertTrue("Future2 should be completed after idle timeout.", future2.isDone()); + assertTrue( + "The future should always be completed after idle timeout.", + reader.isAvailable().isDone()); + } + } + + // This test may fail if the subclass of FlinkSourceReaderBase overrides + // the isAvailable() method, which should have a good reason. + @Test + public void testIsAvailableWithoutIdleTimeout() throws Exception { + try (SourceReader>> reader = createReader()) { + reader.start(); + + CompletableFuture future1 = reader.isAvailable(); + assertFalse("Future1 should be uncompleted without live split.", future1.isDone()); + + reader.notifyNoMoreSplits(); + assertTrue("Future1 should be completed upon no more splits notification.", future1.isDone()); + assertTrue( + "The future should be completed without idle timeout.", reader.isAvailable().isDone()); + } + } + + @Test + public void testNumBytesInMetrics() throws Exception { + final int numSplits = 2; + final int numRecordsPerSplit = 10; + List>> splits = + createSplits(numSplits, numRecordsPerSplit, 0); + SourceTestCompat.TestMetricGroup testMetricGroup = new SourceTestCompat.TestMetricGroup(); + try (SourceReader>> reader = + createReader(null, -1L, null, testMetricGroup)) { + pollAndValidate(reader, splits, false); + } + assertEquals(numRecordsPerSplit * numSplits, testMetricGroup.numRecordsInCounter.getCount()); + } + + // --------------- abstract methods --------------- + protected abstract KV getKVPairs(OutputT record); + + protected abstract SourceReader>> createReader( + ScheduledExecutorService executor, + long idleTimeoutMs, + @Nullable Function timestampExtractor, + SourceTestCompat.TestMetricGroup testMetricGroup); + + protected abstract Source> createBeamSource( + int splitIndex, int numRecordsPerSplit); + + // ------------------- protected helper methods ---------------------- + protected SourceReader>> createReader() { + return createReader(null, -1L, null, new SourceTestCompat.TestMetricGroup()); + } + + protected SourceReader>> createReader( + Function timestampExtractor) { + return createReader(null, -1L, timestampExtractor, new SourceTestCompat.TestMetricGroup()); + } + + protected SourceReader>> createReader( + ScheduledExecutorService executor, long idleTimeoutMs) { + return createReader(executor, idleTimeoutMs, null, new SourceTestCompat.TestMetricGroup()); + } + + protected SourceReader>> createReader( + ScheduledExecutorService executor, + long idleTimeoutMs, + Function timestampExtractor) { + return createReader( + executor, idleTimeoutMs, timestampExtractor, new SourceTestCompat.TestMetricGroup()); + } + + protected void pollAndValidate( + SourceReader>> reader, + List>> splits, + boolean validateReceivedTimestamp) + throws Exception { + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + pollAndValidate(reader, splits, validatingOutput, Integer.MAX_VALUE); + if (validateReceivedTimestamp) { + assertTrue(validatingOutput.allTimestampReceived()); + } + } + + protected void pollAndValidate( + SourceReader>> reader, + List>> splits, + RecordsValidatingOutput validatingOutput, + int numRecordsToConsume) + throws Exception { + reader.addSplits(splits); + reader.notifyNoMoreSplits(); + + do { + reader.pollNext(validatingOutput); + } while (!validatingOutput.allRecordsConsumed() + && validatingOutput.numCollectedRecords() < numRecordsToConsume); + } + + protected List>> createSplits( + int numSplits, int numRecordsPerSplit, int startingIndex) { + List>> splitList = new ArrayList<>(); + for (int i = startingIndex; i < numSplits; i++) { + Source> testingSource = createBeamSource(i, numRecordsPerSplit); + splitList.add(new FlinkSourceSplit<>(i, testingSource)); + } + return splitList; + } + + protected void verifyBeamReaderClosed(List>> splits) { + splits.forEach( + split -> { + TestSource source = (TestSource) split.getBeamSplitSource(); + assertEquals( + "Should have only one beam BoundedReader created", 1, source.createdReaders().size()); + assertTrue( + "The beam BoundedReader should have been closed", + source.createdReaders().get(0).isClosed()); + }); + } + + protected static SourceReaderContext createSourceReaderContext( + SourceTestCompat.TestMetricGroup metricGroup) { + SourceReaderContext mockContext = Mockito.mock(SourceReaderContext.class); + when(mockContext.metricGroup()).thenReturn(metricGroup); + return mockContext; + } + + // -------------------- protected helper class for fetch result validation --------------------- + protected class RecordsValidatingOutput implements SourceTestCompat.ReaderOutputCompat { + private final List>> sources; + private final Map sourceOutputs; + private int numCollectedRecords = 0; + + public RecordsValidatingOutput(List>> splits) { + this.sources = new ArrayList<>(); + this.sourceOutputs = new HashMap<>(); + splits.forEach(split -> sources.add(split.getBeamSplitSource())); + } + + @Override + public void collect(OutputT record) { + KV kv = getKVPairs(record); + ((TestSource) sources.get(kv.getKey())).validateNextValue(kv.getValue()); + numCollectedRecords++; + } + + @Override + public void collect(OutputT record, long timestamp) { + KV kv = getKVPairs(record); + TestSource testSource = ((TestSource) sources.get(kv.getKey())); + testSource.validateNextValue(kv.getValue()); + testSource.validateNextTimestamp(timestamp); + numCollectedRecords++; + } + + @Override + public void emitWatermark(Watermark watermark) {} + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + + @Override + public SourceOutput createOutputForSplit(String splitId) { + return sourceOutputs.computeIfAbsent(splitId, ignored -> new TestSourceOutput(this)); + } + + @Override + public void releaseOutputForSplit(String splitId) {} + + public int numCollectedRecords() { + return numCollectedRecords; + } + + public boolean allRecordsConsumed() { + boolean allRecordsConsumed = true; + for (Source source : sources) { + allRecordsConsumed = allRecordsConsumed && ((TestSource) source).isConsumptionCompleted(); + } + return allRecordsConsumed; + } + + public boolean allTimestampReceived() { + boolean allTimestampReceived = true; + for (Source source : sources) { + allTimestampReceived = + allTimestampReceived && ((TestSource) source).allTimestampsReceived(); + } + return allTimestampReceived; + } + + public Map createdSourceOutputs() { + return sourceOutputs; + } + } + + protected class TestSourceOutput implements SourceTestCompat.SourceOutputCompat { + private final ReaderOutput output; + private @Nullable Watermark watermark; + private boolean isIdle; + + private TestSourceOutput(RecordsValidatingOutput output) { + this.output = output; + this.watermark = null; + this.isIdle = false; + } + + @Override + public void collect(OutputT record) { + output.collect(record); + } + + @Override + public void collect(OutputT record, long timestamp) { + output.collect(record, timestamp); + } + + @Override + public void emitWatermark(Watermark watermark) { + this.watermark = watermark; + } + + @Override + public void markIdle() { + isIdle = true; + } + + @Override + public void markActive() { + isIdle = false; + } + + public @Nullable Watermark watermark() { + return watermark; + } + + public boolean isIdle() { + return isIdle; + } + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java new file mode 100644 index 000000000000..59097bae47bd --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumeratorTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; +import org.junit.Test; + +/** Unit tests for {@link FlinkSourceSplitEnumerator}. */ +public class FlinkSourceSplitEnumeratorTest { + + @Test + public void testAssignSplitsWithBoundedSource() throws IOException { + final int numSubtasks = 2; + final int numSplits = 10; + final int totalNumRecords = 10; + TestingSplitEnumeratorContext>> testContext = + new TestingSplitEnumeratorContext<>(numSubtasks); + TestBoundedCountingSource testSource = + new TestBoundedCountingSource(numSplits, totalNumRecords); + + assignSplits(testContext, testSource, numSplits); + assertEquals(numSubtasks, testContext.getSplitAssignments().size()); + + testContext + .getSplitAssignments() + .forEach( + (subtaskId, state) -> { + int expectedNumSplitsPerSubtask = numSplits / numSubtasks; + assertEquals( + "Each subtask should have " + expectedNumSplitsPerSubtask + " assigned splits", + expectedNumSplitsPerSubtask, + state.getAssignedSplits().size()); + assertTrue( + "Each subtask should have received NoMoreSplits", + state.hasReceivedNoMoreSplitsSignal()); + state + .getAssignedSplits() + .forEach( + split -> { + TestBoundedCountingSource source = + (TestBoundedCountingSource) split.getBeamSplitSource(); + try { + int expectedSplitSize = totalNumRecords / numSplits; + assertEquals( + expectedSplitSize, + source.getEstimatedSizeBytes(FlinkPipelineOptions.defaults())); + } catch (Exception e) { + fail("Received exception" + e); + } + }); + }); + } + + @Test + public void testAssignSplitsWithUnboundedSource() throws IOException { + final int numSplits = 10; + final int numSubtasks = 5; + final int numRecordsPerSplit = 10; + TestingSplitEnumeratorContext>> testContext = + new TestingSplitEnumeratorContext<>(numSubtasks); + TestCountingSource testSource = new TestCountingSource(numRecordsPerSplit); + + assignSplits(testContext, testSource, numSplits); + + testContext + .getSplitAssignments() + .forEach( + (subtaskId, state) -> { + int expectedNumSplitsPerSubtask = numSplits / numSubtasks; + assertEquals( + "Each subtask should have " + expectedNumSplitsPerSubtask + " assigned splits", + expectedNumSplitsPerSubtask, + state.getAssignedSplits().size()); + assertTrue( + "Each subtask should have received NoMoreSplits", + state.hasReceivedNoMoreSplitsSignal()); + }); + } + + @Test + public void testAddSplitsBack() throws IOException { + final int numSubtasks = 2; + final int numSplits = 10; + final int totalNumRecords = 10; + TestingSplitEnumeratorContext>> testContext = + new TestingSplitEnumeratorContext<>(numSubtasks); + TestBoundedCountingSource testSource = + new TestBoundedCountingSource(numSplits, totalNumRecords); + try (FlinkSourceSplitEnumerator> splitEnumerator = + new FlinkSourceSplitEnumerator<>( + testContext, testSource, FlinkPipelineOptions.defaults(), numSplits)) { + splitEnumerator.start(); + testContext.registerReader(0, "0"); + splitEnumerator.addReader(0); + testContext.getExecutorService().triggerAll(); + + List>> splitsForReader = + testContext.getSplitAssignments().get(0).getAssignedSplits(); + assertEquals(numSplits / numSubtasks, splitsForReader.size()); + + splitEnumerator.addSplitsBack(splitsForReader, 0); + splitEnumerator.addReader(0); + assertEquals(2 * numSplits / numSubtasks, splitsForReader.size()); + } + } + + private void assignSplits( + TestingSplitEnumeratorContext>> context, + Source> source, + int numSplits) + throws IOException { + try (FlinkSourceSplitEnumerator> splitEnumerator = + new FlinkSourceSplitEnumerator<>( + context, source, FlinkPipelineOptions.defaults(), numSplits)) { + splitEnumerator.start(); + // Add a reader before splitting the beam source. + context.registerReader(0, "0"); + splitEnumerator.addReader(0); + context.getExecutorService().triggerAll(); + context.registerReader(1, "1"); + // Add another reader after splitting the beam source. + splitEnumerator.addReader(1); + } + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java new file mode 100644 index 000000000000..5b8b734a68ae --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/TestSource.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; + +import java.io.Serializable; +import java.util.List; + +public interface TestSource extends Serializable { + + List createdReaders(); + + boolean validateNextValue(int value); + + boolean validateNextTimestamp(long timestamp); + + boolean isConsumptionCompleted(); + + boolean allTimestampsReceived(); + + interface TestReader extends Serializable { + boolean isClosed(); + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java new file mode 100644 index 000000000000..6303a729652a --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestBoundedCountingSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.junit.Test; +import org.mockito.Mockito; + +/** Unite tests for {@link FlinkBoundedSourceReader}. */ +public class FlinkBoundedSourceReaderTest + extends FlinkSourceReaderTestBase>> { + + @Test + public void testPollWithIdleTimeout() throws Exception { + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + ReaderOutput>> mockReaderOutput = + Mockito.mock(ReaderOutput.class); + try (FlinkBoundedSourceReader> reader = + (FlinkBoundedSourceReader>) createReader(executor, 1)) { + reader.notifyNoMoreSplits(); + assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(mockReaderOutput)); + + executor.triggerScheduledTasks(); + assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(mockReaderOutput)); + } + } + + @Test + public void testPollWithoutIdleTimeout() throws Exception { + ReaderOutput>> mockReaderOutput = + Mockito.mock(ReaderOutput.class); + try (SourceReader>, FlinkSourceSplit>> + reader = createReader()) { + reader.notifyNoMoreSplits(); + assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(mockReaderOutput)); + } + } + + @Test + public void testIsAvailableOnSplitsAssignment() throws Exception { + try (SourceReader>, FlinkSourceSplit>> + reader = createReader()) { + reader.start(); + + CompletableFuture future1 = reader.isAvailable(); + assertFalse("No split assigned yet, should not be available.", future1.isDone()); + + // Data available on split assigned. + reader.addSplits(createSplits(1, 1, 0)); + assertTrue("Adding a split should complete future1", future1.isDone()); + assertTrue("Data should be available with a live split.", reader.isAvailable().isDone()); + } + } + + @Test + public void testSnapshotStateAndRestore() throws Exception { + final int numSplits = 2; + final int numRecordsPerSplit = 10; + + List>> splits = + createSplits(numSplits, numRecordsPerSplit, 0); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + List>> snapshot; + + // Create a reader, take a snapshot. + try (SourceReader>, FlinkSourceSplit>> + reader = createReader()) { + // Only poll half of the records in the first split. + pollAndValidate(reader, splits, validatingOutput, numRecordsPerSplit / 2); + snapshot = reader.snapshotState(0L); + } + + // Create a new validating output because the first split will be consumed from very beginning. + validatingOutput = new RecordsValidatingOutput(splits); + // Create another reader, add the snapshot splits back. + try (SourceReader>, FlinkSourceSplit>> + reader = createReader()) { + pollAndValidate(reader, snapshot, validatingOutput, Integer.MAX_VALUE); + } + } + + // --------------- abstract methods impl ---------------- + @Override + protected KV getKVPairs(WindowedValue> record) { + return record.getValue(); + } + + @Override + protected Source> createBeamSource(int splitIndex, int numRecordsPerSplit) { + return new TestBoundedCountingSource(splitIndex, numRecordsPerSplit); + } + + @Override + protected FlinkBoundedSourceReader> createReader( + ScheduledExecutorService executor, + long idleTimeoutMs, + @Nullable Function>, Long> timestampExtractor, + TestMetricGroup testMetricGroup) { + FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults(); + pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs); + SourceReaderContext mockContext = createSourceReaderContext(testMetricGroup); + if (executor != null) { + return new FlinkBoundedSourceReader<>( + mockContext, pipelineOptions, executor, timestampExtractor); + } else { + return new FlinkBoundedSourceReader<>(mockContext, pipelineOptions, timestampExtractor); + } + } +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java new file mode 100644 index 000000000000..f420bd8900ff --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded; + +import static org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSourceReader.PENDING_BYTES_METRIC_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestCountingSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderTestBase; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.SourceTestCompat.TestMetricGroup; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.apache.flink.metrics.Gauge; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +/** Unite tests for {@link FlinkUnboundedSourceReader}. */ +public class FlinkUnboundedSourceReaderTest + extends FlinkSourceReaderTestBase>>> { + + @Test + public void testSnapshotStateAndRestore() throws Exception { + final int numSplits = 2; + final int numRecordsPerSplit = 10; + + List>> splits = + createSplits(numSplits, numRecordsPerSplit, 0); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + List>> snapshot; + + // Create a reader, take a snapshot. + try (SourceReader< + WindowedValue>>, + FlinkSourceSplit>> + reader = createReader()) { + pollAndValidate(reader, splits, validatingOutput, numSplits * numRecordsPerSplit / 2); + snapshot = reader.snapshotState(0L); + } + + // Create another reader, add the snapshot splits back. + try (SourceReader< + WindowedValue>>, + FlinkSourceSplit>> + reader = createReader()) { + pollAndValidate(reader, snapshot, validatingOutput, Integer.MAX_VALUE); + } + } + + /** + * This is a concurrency correctness test. It verifies that the main thread is always waken up by + * the alarm runner executed in the executor thread. + */ + @Test(timeout = 30000L) + public void testIsAvailableAlwaysWakenUp() throws Exception { + final int numFuturesRequired = 1_000_000; + List> futures = new ArrayList<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + + List>> splits = new ArrayList<>(); + splits.add(new FlinkSourceSplit<>(0, new DummySource(Integer.MAX_VALUE))); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + + try (SourceReader< + WindowedValue>>, + FlinkSourceSplit>> + reader = createReader(executor, Long.MAX_VALUE)) { + reader.start(); + reader.addSplits(splits); + + Thread mainThread = + new Thread( + () -> { + try { + while (futures.size() < numFuturesRequired) { + // This poll will return NOTHING_AVAILABLE after each record emission. + if (reader.pollNext(validatingOutput) == InputStatus.NOTHING_AVAILABLE) { + CompletableFuture future = reader.isAvailable(); + future.get(); + futures.add(future); + } + } + } catch (Exception e) { + if (!exceptionRef.compareAndSet(null, e)) { + exceptionRef.get().addSuppressed(e); + } + } + }, + "MainThread"); + + Thread executorThread = + new Thread( + () -> { + while (futures.size() < numFuturesRequired) { + executor.triggerScheduledTasks(); + } + }, + "ExecutorThread"); + + mainThread.start(); + executorThread.start(); + executorThread.join(); + } + } + + @Test + public void testIsAvailableOnSplitChangeWhenNoDataAvailableForAliveReaders() throws Exception { + List>> splits1 = new ArrayList<>(); + List>> splits2 = new ArrayList<>(); + splits1.add(new FlinkSourceSplit<>(0, new DummySource(0))); + splits2.add(new FlinkSourceSplit<>(1, new DummySource(0))); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits1); + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + + try (SourceReader< + WindowedValue>>, + FlinkSourceSplit>> + reader = createReader(executor, Long.MAX_VALUE)) { + reader.start(); + reader.addSplits(splits1); + + assertEquals( + "The reader should have nothing available", + InputStatus.NOTHING_AVAILABLE, + reader.pollNext(validatingOutput)); + + CompletableFuture future1 = reader.isAvailable(); + assertFalse("Future1 should be uncompleted without live split.", future1.isDone()); + + reader.addSplits(splits2); + assertTrue("Future1 should be completed upon addition of new splits.", future1.isDone()); + + CompletableFuture future2 = reader.isAvailable(); + assertFalse("Future2 should be uncompleted without live split.", future2.isDone()); + + reader.notifyNoMoreSplits(); + assertTrue("Future2 should be completed upon NoMoreSplitsNotification.", future2.isDone()); + } + } + + @Test + public void testWatermark() throws Exception { + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + try (FlinkUnboundedSourceReader> reader = + (FlinkUnboundedSourceReader>) createReader(executor, -1L)) { + List>> splits = createSplits(2, 10, 0); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + + reader.start(); + reader.addSplits(splits); + + // Poll 3 records from split 0 and 2 records from split 1. + for (int i = 0; i < 5; i++) { + reader.pollNext(validatingOutput); + } + + Map sourceOutputs = validatingOutput.createdSourceOutputs(); + assertEquals("There should be 2 source outputs created.", 2, sourceOutputs.size()); + assertNull(sourceOutputs.get("0").watermark()); + assertNull(sourceOutputs.get("1").watermark()); + + // Trigger the periodic task marking the watermark emission flag. + executor.triggerScheduledTasks(); + // Poll one more time to actually emit the watermark. Getting record value 2 from split_1. + reader.pollNext(validatingOutput); + + assertEquals(3, sourceOutputs.get("0").watermark().getTimestamp()); + assertEquals(2, sourceOutputs.get("1").watermark().getTimestamp()); + + // Poll one more time to ensure no additional watermark is emitted. Getting record value 3 + // from split_0. + reader.pollNext(validatingOutput); + assertEquals(3, sourceOutputs.get("0").watermark().getTimestamp()); + assertEquals(2, sourceOutputs.get("1").watermark().getTimestamp()); + + // Trigger the task to mark the watermark emission flag again. + executor.triggerScheduledTasks(); + // Poll to actually emit the watermark. Getting (split_1 -> 3). + reader.pollNext(validatingOutput); + + assertEquals(4, sourceOutputs.get("0").watermark().getTimestamp()); + assertEquals(3, sourceOutputs.get("1").watermark().getTimestamp()); + } + } + + @Test + public void testPendingBytesMetric() throws Exception { + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + TestMetricGroup testMetricGroup = new TestMetricGroup(); + try (SourceReader< + WindowedValue>>, + FlinkSourceSplit>> + reader = createReader(executor, 0L, null, testMetricGroup)) { + reader.start(); + + List>> splits = createSplits(2, 10, 0); + reader.addSplits(splits); + RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits); + + // Need to poll once to create all the readers. + reader.pollNext(validatingOutput); + Gauge pendingBytesGauge = + (Gauge) testMetricGroup.registeredGauge.get(PENDING_BYTES_METRIC_NAME); + assertNotNull(pendingBytesGauge); + // The TestCountingSource.CountingSourceReader always return 7L as backlog bytes. Because we + // have 2 splits, + // the expected value is the magic number 14 here. + assertEquals(14L, pendingBytesGauge.getValue().longValue()); + } + } + + // --------------- private helper classes ----------------- + /** A source whose advance() method only returns true occasionally. */ + private static class DummySource extends TestCountingSource { + + public DummySource(int numMessagesPerShard) { + super(numMessagesPerShard); + } + + @Override + public CountingSourceReader createReader( + PipelineOptions options, @Nullable CounterMark checkpointMark) { + CountingSourceReader reader = new DummySourceReader(); + createdReaders().add(reader); + return reader; + } + + private class DummySourceReader extends TestCountingSource.CountingSourceReader { + private final Random random = new Random(); + + public DummySourceReader() { + super(0); + } + + @Override + public boolean advance() { + // Return true once every three times advance is invoked. + if (random.nextInt(3) == 0) { + return super.advance(); + } else { + return false; + } + } + } + } + + // --------------- abstract methods impl ------------------ + @Override + protected KV getKVPairs( + WindowedValue>> record) { + return record.getValue().getValue(); + } + + @Override + protected FlinkUnboundedSourceReader> createReader( + ScheduledExecutorService executor, + long idleTimeoutMs, + Function>>, Long> timestampExtractor, + TestMetricGroup metricGroup) { + FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults(); + pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs); + pipelineOptions.setAutoWatermarkInterval(10L); + SourceReaderContext mockContext = createSourceReaderContext(metricGroup); + if (executor != null) { + return new FlinkUnboundedSourceReader<>( + mockContext, pipelineOptions, executor, timestampExtractor); + } else { + return new FlinkUnboundedSourceReader<>(mockContext, pipelineOptions, timestampExtractor); + } + } + + @Override + protected Source> createBeamSource(int splitIndex, int numRecordsPerSplit) { + return new TestCountingSource(numRecordsPerSplit) + .withShardNumber(splitIndex) + .withFixedNumSplits(1); + } +}