diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index f0b93e0dde0f..ce4404f8ce9a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -42,7 +40,6 @@ import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; @@ -149,19 +146,11 @@ public List> snapshotState(long checkpointId) { // Add all the source splits being actively read. beamSourceReaders.forEach( (splitId, readerAndOutput) -> { - Source.Reader reader = readerAndOutput.reader; - if (reader instanceof BoundedSource.BoundedReader) { - // Sometimes users may decide to run a bounded source in streaming mode as "finite - // stream." - // For bounded source, the checkpoint granularity is the entire source split. - // So, in case of failure, all the data from this split will be consumed again. - splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource())); - } else if (reader instanceof UnboundedSource.UnboundedReader) { - // The checkpoint for unbounded sources is fine granular. - byte[] checkpointState = - getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader) reader); - splitsState.add( - new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState)); + try { + splitsState.add(getReaderCheckpoint(splitId, readerAndOutput)); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Failed to get checkpoint for split %d", splitId), e); } }); return splitsState; @@ -228,9 +217,17 @@ public void close() throws Exception { */ protected abstract CompletableFuture isAvailableForAliveReaders(); + /** Create {@link FlinkSourceSplit} for given {@code splitId}. */ + protected abstract FlinkSourceSplit getReaderCheckpoint( + int splitId, ReaderAndOutput readerAndOutput) throws IOException; + + /** Create {@link Source.Reader} for given {@link FlinkSourceSplit}. */ + protected abstract Source.Reader createReader(@Nonnull FlinkSourceSplit sourceSplit) + throws IOException; + // ----------------- protected helper methods for subclasses -------------------- - protected Optional createAndTrackNextReader() throws IOException { + protected final Optional createAndTrackNextReader() throws IOException { FlinkSourceSplit sourceSplit = sourceSplits.poll(); if (sourceSplit != null) { Source.Reader reader = createReader(sourceSplit); @@ -241,7 +238,7 @@ protected Optional createAndTrackNextReader() throws IOExceptio return Optional.empty(); } - protected void finishSplit(int splitIndex) throws IOException { + protected final void finishSplit(int splitIndex) throws IOException { ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex); if (readerAndOutput != null) { LOG.info("Finished reading from split {}", readerAndOutput.splitId); @@ -252,7 +249,7 @@ protected void finishSplit(int splitIndex) throws IOException { } } - protected boolean checkIdleTimeoutAndMaybeStartCountdown() { + protected final boolean checkIdleTimeoutAndMaybeStartCountdown() { if (idleTimeoutMs <= 0) { idleTimeoutFuture.complete(null); } else if (!idleTimeoutCountingDown) { @@ -262,7 +259,7 @@ protected boolean checkIdleTimeoutAndMaybeStartCountdown() { return idleTimeoutFuture.isDone(); } - protected boolean noMoreSplits() { + protected final boolean noMoreSplits() { return noMoreSplits; } @@ -308,49 +305,6 @@ protected Map allReaders() { protected static void ignoreReturnValue(Object o) { // do nothing. } - // ------------------------------ private methods ------------------------------ - - @SuppressWarnings("unchecked") - private - byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader reader) { - UnboundedSource source = - (UnboundedSource) reader.getCurrentSource(); - CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark(); - Coder coder = source.getCheckpointMarkCoder(); - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - coder.encode(checkpointMark, baos); - return baos.toByteArray(); - } catch (IOException ioe) { - throw new RuntimeException("Failed to encode checkpoint mark.", ioe); - } - } - - private Source.Reader createReader(@Nonnull FlinkSourceSplit sourceSplit) - throws IOException { - Source beamSource = sourceSplit.getBeamSplitSource(); - if (beamSource instanceof BoundedSource) { - return ((BoundedSource) beamSource).createReader(pipelineOptions); - } else if (beamSource instanceof UnboundedSource) { - return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState()); - } else { - throw new IllegalStateException("Unknown source type " + beamSource.getClass()); - } - } - - private - Source.Reader createUnboundedSourceReader( - Source beamSource, @Nullable byte[] splitState) throws IOException { - UnboundedSource unboundedSource = - (UnboundedSource) beamSource; - Coder coder = unboundedSource.getCheckpointMarkCoder(); - if (splitState == null) { - return unboundedSource.createReader(pipelineOptions, null); - } else { - try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) { - return unboundedSource.createReader(pipelineOptions, coder.decode(bais)); - } - } - } // -------------------- protected helper class --------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java index 292697479bcd..8ceab393533d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java @@ -121,7 +121,6 @@ public void addReader(int subtaskId) { List> splitsForSubtask = pendingSplits.remove(subtaskId); if (splitsForSubtask != null) { assignSplitsAndLog(splitsForSubtask, subtaskId); - pendingSplits.remove(subtaskId); } else { if (splitsInitialized) { LOG.info("There is no split for subtask {}. Signaling no more splits.", subtaskId); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java index b015b527aa45..a25964af809d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java @@ -18,18 +18,29 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.core.io.InputStatus; @@ -50,6 +61,8 @@ */ public class FlinkBoundedSourceReader extends FlinkSourceReaderBase> { private static final Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class); + private static final VarLongCoder LONG_CODER = VarLongCoder.of(); + private final Map consumedFromSplit = new HashMap<>(); private @Nullable Source.Reader currentReader; private int currentSplitId; @@ -62,6 +75,40 @@ public FlinkBoundedSourceReader( currentSplitId = -1; } + @Override + protected FlinkSourceSplit getReaderCheckpoint(int splitId, ReaderAndOutput readerAndOutput) + throws CoderException { + // Sometimes users may decide to run a bounded source in streaming mode as "finite + // stream." + // For bounded source, the checkpoint granularity is the entire source split. + // So, in case of failure, all the data from this split will be consumed again. + return new FlinkSourceSplit<>( + splitId, readerAndOutput.reader.getCurrentSource(), asBytes(consumedFromSplit(splitId))); + } + + @Override + protected Source.Reader createReader(@Nonnull FlinkSourceSplit sourceSplit) + throws IOException { + Source beamSource = sourceSplit.getBeamSplitSource(); + byte[] state = sourceSplit.getSplitState(); + if (state != null) { + consumedFromSplit.put(Integer.parseInt(sourceSplit.splitId()), fromBytes(state)); + } + return ((BoundedSource) beamSource).createReader(pipelineOptions); + } + + private byte[] asBytes(long l) throws CoderException { + return CoderUtils.encodeToByteArray(LONG_CODER, l); + } + + private long fromBytes(byte[] b) throws CoderException { + return CoderUtils.decodeFromByteArray(LONG_CODER, b); + } + + private long consumedFromSplit(int splitId) { + return consumedFromSplit.getOrDefault(splitId, 0L); + } + @VisibleForTesting protected FlinkBoundedSourceReader( String stepName, @@ -78,26 +125,28 @@ public InputStatus pollNext(ReaderOutput> output) throws Except checkExceptionAndMaybeThrow(); if (currentReader == null && !moveToNextNonEmptyReader()) { // Nothing to read for now. - if (noMoreSplits() && checkIdleTimeoutAndMaybeStartCountdown()) { - // All the source splits have been read and idle timeout has passed. - LOG.info( - "All splits have finished reading, and idle time {} ms has passed.", idleTimeoutMs); - return InputStatus.END_OF_INPUT; - } else { - // This reader either hasn't received NoMoreSplitsEvent yet or it is waiting for idle - // timeout. - return InputStatus.NOTHING_AVAILABLE; + if (noMoreSplits()) { + output.emitWatermark(Watermark.MAX_WATERMARK); + if (checkIdleTimeoutAndMaybeStartCountdown()) { + // All the source splits have been read and idle timeout has passed. + LOG.info( + "All splits have finished reading, and idle time {} ms has passed.", idleTimeoutMs); + return InputStatus.END_OF_INPUT; + } } + // This reader either hasn't received NoMoreSplitsEvent yet or it is waiting for idle + // timeout. + return InputStatus.NOTHING_AVAILABLE; } - Source.Reader tempCurrentReader = currentReader; - if (tempCurrentReader != null) { - T record = tempCurrentReader.getCurrent(); + if (currentReader != null) { + // make null checks happy + final @Nonnull Source.Reader splitReader = currentReader; + // store number of processed elements from this split + consumedFromSplit.compute(currentSplitId, (k, v) -> v == null ? 1 : v + 1); + T record = splitReader.getCurrent(); WindowedValue windowedValue = WindowedValue.of( - record, - tempCurrentReader.getCurrentTimestamp(), - GlobalWindow.INSTANCE, - PaneInfo.NO_FIRING); + record, splitReader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); if (timestampExtractor == null) { output.collect(windowedValue); } else { @@ -107,11 +156,12 @@ public InputStatus pollNext(ReaderOutput> output) throws Except // If the advance() invocation throws exception here, the job will just fail over and read // everything again from // the beginning. So the failover granularity is the entire Flink job. - if (!invocationUtil.invokeAdvance(tempCurrentReader)) { + if (!invocationUtil.invokeAdvance(splitReader)) { finishSplit(currentSplitId); + consumedFromSplit.remove(currentSplitId); + LOG.debug("Finished reading from {}", currentSplitId); currentReader = null; currentSplitId = -1; - LOG.debug("Finished reading from {}", currentSplitId); } // Always return MORE_AVAILABLE here regardless of the availability of next record. If there // is no more @@ -138,6 +188,12 @@ private boolean moveToNextNonEmptyReader() throws IOException { if (invocationUtil.invokeStart(rao.reader)) { currentSplitId = Integer.parseInt(rao.splitId); currentReader = rao.reader; + long toSkipAfterStart = + MoreObjects.firstNonNull(consumedFromSplit.remove(currentSplitId), 0L); + @Nonnull Source.Reader reader = Preconditions.checkArgumentNotNull(currentReader); + while (toSkipAfterStart > 0 && reader.advance()) { + toSkipAfterStart--; + } return true; } else { finishSplit(Integer.parseInt(rao.splitId)); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index 0a7acb669efd..7b02702e244c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -25,9 +27,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -179,6 +184,22 @@ protected CompletableFuture isAvailableForAliveReaders() { } } + @Override + protected FlinkSourceSplit getReaderCheckpoint(int splitId, ReaderAndOutput readerAndOutput) { + // The checkpoint for unbounded sources is fine granular. + byte[] checkpointState = + getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader) readerAndOutput.reader); + return new FlinkSourceSplit<>( + splitId, readerAndOutput.reader.getCurrentSource(), checkpointState); + } + + @Override + protected Source.Reader createReader(@Nonnull FlinkSourceSplit sourceSplit) + throws IOException { + Source beamSource = sourceSplit.getBeamSplitSource(); + return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState()); + } + // -------------- private helper methods ---------------- private void emitRecord( @@ -274,4 +295,34 @@ private void createPendingBytesGauge(SourceReaderContext context) { return pendingBytes; }); } + + @SuppressWarnings("unchecked") + private + byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader reader) { + UnboundedSource source = + (UnboundedSource) reader.getCurrentSource(); + CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark(); + Coder coder = source.getCheckpointMarkCoder(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + coder.encode(checkpointMark, baos); + return baos.toByteArray(); + } catch (IOException ioe) { + throw new RuntimeException("Failed to encode checkpoint mark.", ioe); + } + } + + private + Source.Reader createUnboundedSourceReader( + Source beamSource, @Nullable byte[] splitState) throws IOException { + UnboundedSource unboundedSource = + (UnboundedSource) beamSource; + Coder coder = unboundedSource.getCheckpointMarkCoder(); + if (splitState == null) { + return unboundedSource.createReader(pipelineOptions, null); + } else { + try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) { + return unboundedSource.createReader(pipelineOptions, coder.decode(bais)); + } + } + } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java index 462a1ba0153d..c635a5778b5c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderTestBase.java @@ -363,11 +363,9 @@ public int numCollectedRecords() { } public boolean allRecordsConsumed() { - boolean allRecordsConsumed = true; - for (Source source : sources) { - allRecordsConsumed = allRecordsConsumed && ((TestSource) source).isConsumptionCompleted(); - } - return allRecordsConsumed; + return sources.stream() + .map(TestSource.class::cast) + .allMatch(TestSource::isConsumptionCompleted); } public boolean allTimestampReceived() { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java index 84cb2a72ddaf..022f1abde826 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -34,6 +35,7 @@ import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -62,6 +64,20 @@ public void testPollWithIdleTimeout() throws Exception { } } + @Test + public void testPollEmitsMaxWatermark() throws Exception { + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + ReaderOutput>> mockReaderOutput = + Mockito.mock(ReaderOutput.class); + try (FlinkBoundedSourceReader> reader = + (FlinkBoundedSourceReader>) createReader(executor, Long.MAX_VALUE)) { + reader.notifyNoMoreSplits(); + assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(mockReaderOutput)); + verify(mockReaderOutput).emitWatermark(Watermark.MAX_WATERMARK); + } + } + @Test public void testPollWithoutIdleTimeout() throws Exception { ReaderOutput>> mockReaderOutput = @@ -107,8 +123,6 @@ public void testSnapshotStateAndRestore() throws Exception { snapshot = reader.snapshotState(0L); } - // Create a new validating output because the first split will be consumed from very beginning. - validatingOutput = new RecordsValidatingOutput(splits); // Create another reader, add the snapshot splits back. try (SourceReader>, FlinkSourceSplit>> reader = createReader()) {