Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -42,7 +40,6 @@
import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.FlinkSourceCompat;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -149,19 +146,11 @@ public List<FlinkSourceSplit<T>> snapshotState(long checkpointId) {
// Add all the source splits being actively read.
beamSourceReaders.forEach(
(splitId, readerAndOutput) -> {
Source.Reader<T> reader = readerAndOutput.reader;
if (reader instanceof BoundedSource.BoundedReader) {
// Sometimes users may decide to run a bounded source in streaming mode as "finite
// stream."
// For bounded source, the checkpoint granularity is the entire source split.
// So, in case of failure, all the data from this split will be consumed again.
splitsState.add(new FlinkSourceSplit<>(splitId, reader.getCurrentSource()));
} else if (reader instanceof UnboundedSource.UnboundedReader) {
// The checkpoint for unbounded sources is fine granular.
byte[] checkpointState =
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<OutputT>) reader);
splitsState.add(
new FlinkSourceSplit<>(splitId, reader.getCurrentSource(), checkpointState));
try {
splitsState.add(getReaderCheckpoint(splitId, readerAndOutput));
} catch (IOException e) {
throw new IllegalStateException(
String.format("Failed to get checkpoint for split %d", splitId), e);
}
});
return splitsState;
Expand Down Expand Up @@ -228,9 +217,17 @@ public void close() throws Exception {
*/
protected abstract CompletableFuture<Void> isAvailableForAliveReaders();

/** Create {@link FlinkSourceSplit} for given {@code splitId}. */
protected abstract FlinkSourceSplit<T> getReaderCheckpoint(
int splitId, ReaderAndOutput readerAndOutput) throws IOException;

/** Create {@link Source.Reader} for given {@link FlinkSourceSplit}. */
protected abstract Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException;

// ----------------- protected helper methods for subclasses --------------------

protected Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {
protected final Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {
FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();
if (sourceSplit != null) {
Source.Reader<T> reader = createReader(sourceSplit);
Expand All @@ -241,7 +238,7 @@ protected Optional<ReaderAndOutput> createAndTrackNextReader() throws IOExceptio
return Optional.empty();
}

protected void finishSplit(int splitIndex) throws IOException {
protected final void finishSplit(int splitIndex) throws IOException {
ReaderAndOutput readerAndOutput = beamSourceReaders.remove(splitIndex);
if (readerAndOutput != null) {
LOG.info("Finished reading from split {}", readerAndOutput.splitId);
Expand All @@ -252,7 +249,7 @@ protected void finishSplit(int splitIndex) throws IOException {
}
}

protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
protected final boolean checkIdleTimeoutAndMaybeStartCountdown() {
if (idleTimeoutMs <= 0) {
idleTimeoutFuture.complete(null);
} else if (!idleTimeoutCountingDown) {
Expand All @@ -262,7 +259,7 @@ protected boolean checkIdleTimeoutAndMaybeStartCountdown() {
return idleTimeoutFuture.isDone();
}

protected boolean noMoreSplits() {
protected final boolean noMoreSplits() {
return noMoreSplits;
}

Expand Down Expand Up @@ -308,49 +305,6 @@ protected Map<Integer, ReaderAndOutput> allReaders() {
protected static void ignoreReturnValue(Object o) {
// do nothing.
}
// ------------------------------ private methods ------------------------------

@SuppressWarnings("unchecked")
private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<OutputT> reader) {
UnboundedSource<OutputT, CheckpointMarkT> source =
(UnboundedSource<OutputT, CheckpointMarkT>) reader.getCurrentSource();
CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark();
Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
coder.encode(checkpointMark, baos);
return baos.toByteArray();
} catch (IOException ioe) {
throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
}
}

private Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException {
Source<T> beamSource = sourceSplit.getBeamSplitSource();
if (beamSource instanceof BoundedSource) {
return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
} else if (beamSource instanceof UnboundedSource) {
return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState());
} else {
throw new IllegalStateException("Unknown source type " + beamSource.getClass());
}
}

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Source.Reader<T> createUnboundedSourceReader(
Source<T> beamSource, @Nullable byte[] splitState) throws IOException {
UnboundedSource<T, CheckpointMarkT> unboundedSource =
(UnboundedSource<T, CheckpointMarkT>) beamSource;
Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
if (splitState == null) {
return unboundedSource.createReader(pipelineOptions, null);
} else {
try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) {
return unboundedSource.createReader(pipelineOptions, coder.decode(bais));
}
}
}

// -------------------- protected helper class ---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public void addReader(int subtaskId) {
List<FlinkSourceSplit<T>> splitsForSubtask = pendingSplits.remove(subtaskId);
if (splitsForSubtask != null) {
assignSplitsAndLog(splitsForSubtask, subtaskId);
pendingSplits.remove(subtaskId);
} else {
if (splitsInitialized) {
LOG.info("There is no split for subtask {}. Signaling no more splits.", subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
Expand All @@ -50,6 +61,8 @@
*/
public class FlinkBoundedSourceReader<T> extends FlinkSourceReaderBase<T, WindowedValue<T>> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class);
private static final VarLongCoder LONG_CODER = VarLongCoder.of();
private final Map<Integer, Long> consumedFromSplit = new HashMap<>();
private @Nullable Source.Reader<T> currentReader;
private int currentSplitId;

Expand All @@ -62,6 +75,40 @@ public FlinkBoundedSourceReader(
currentSplitId = -1;
}

@Override
protected FlinkSourceSplit<T> getReaderCheckpoint(int splitId, ReaderAndOutput readerAndOutput)
throws CoderException {
// Sometimes users may decide to run a bounded source in streaming mode as "finite
// stream."
// For bounded source, the checkpoint granularity is the entire source split.
// So, in case of failure, all the data from this split will be consumed again.
return new FlinkSourceSplit<>(
splitId, readerAndOutput.reader.getCurrentSource(), asBytes(consumedFromSplit(splitId)));
}

@Override
protected Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException {
Source<T> beamSource = sourceSplit.getBeamSplitSource();
byte[] state = sourceSplit.getSplitState();
if (state != null) {
consumedFromSplit.put(Integer.parseInt(sourceSplit.splitId()), fromBytes(state));
}
return ((BoundedSource<T>) beamSource).createReader(pipelineOptions);
}

private byte[] asBytes(long l) throws CoderException {
return CoderUtils.encodeToByteArray(LONG_CODER, l);
}

private long fromBytes(byte[] b) throws CoderException {
return CoderUtils.decodeFromByteArray(LONG_CODER, b);
}

private long consumedFromSplit(int splitId) {
return consumedFromSplit.getOrDefault(splitId, 0L);
}

@VisibleForTesting
protected FlinkBoundedSourceReader(
String stepName,
Expand All @@ -78,26 +125,28 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws Except
checkExceptionAndMaybeThrow();
if (currentReader == null && !moveToNextNonEmptyReader()) {
// Nothing to read for now.
if (noMoreSplits() && checkIdleTimeoutAndMaybeStartCountdown()) {
// All the source splits have been read and idle timeout has passed.
LOG.info(
"All splits have finished reading, and idle time {} ms has passed.", idleTimeoutMs);
return InputStatus.END_OF_INPUT;
} else {
// This reader either hasn't received NoMoreSplitsEvent yet or it is waiting for idle
// timeout.
return InputStatus.NOTHING_AVAILABLE;
if (noMoreSplits()) {
output.emitWatermark(Watermark.MAX_WATERMARK);
if (checkIdleTimeoutAndMaybeStartCountdown()) {
// All the source splits have been read and idle timeout has passed.
LOG.info(
"All splits have finished reading, and idle time {} ms has passed.", idleTimeoutMs);
return InputStatus.END_OF_INPUT;
}
}
// This reader either hasn't received NoMoreSplitsEvent yet or it is waiting for idle
// timeout.
return InputStatus.NOTHING_AVAILABLE;
}
Source.Reader<T> tempCurrentReader = currentReader;
if (tempCurrentReader != null) {
T record = tempCurrentReader.getCurrent();
if (currentReader != null) {
// make null checks happy
final @Nonnull Source.Reader<T> splitReader = currentReader;
// store number of processed elements from this split
consumedFromSplit.compute(currentSplitId, (k, v) -> v == null ? 1 : v + 1);
T record = splitReader.getCurrent();
WindowedValue<T> windowedValue =
WindowedValue.of(
record,
tempCurrentReader.getCurrentTimestamp(),
GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING);
record, splitReader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
if (timestampExtractor == null) {
output.collect(windowedValue);
} else {
Expand All @@ -107,11 +156,12 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws Except
// If the advance() invocation throws exception here, the job will just fail over and read
// everything again from
// the beginning. So the failover granularity is the entire Flink job.
if (!invocationUtil.invokeAdvance(tempCurrentReader)) {
if (!invocationUtil.invokeAdvance(splitReader)) {
finishSplit(currentSplitId);
consumedFromSplit.remove(currentSplitId);
LOG.debug("Finished reading from {}", currentSplitId);
currentReader = null;
currentSplitId = -1;
LOG.debug("Finished reading from {}", currentSplitId);
}
// Always return MORE_AVAILABLE here regardless of the availability of next record. If there
// is no more
Expand All @@ -138,6 +188,12 @@ private boolean moveToNextNonEmptyReader() throws IOException {
if (invocationUtil.invokeStart(rao.reader)) {
currentSplitId = Integer.parseInt(rao.splitId);
currentReader = rao.reader;
long toSkipAfterStart =
MoreObjects.firstNonNull(consumedFromSplit.remove(currentSplitId), 0L);
@Nonnull Source.Reader<T> reader = Preconditions.checkArgumentNotNull(currentReader);
while (toSkipAfterStart > 0 && reader.advance()) {
toSkipAfterStart--;
}
return true;
} else {
finishSplit(Integer.parseInt(rao.splitId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -25,9 +27,12 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -179,6 +184,22 @@ protected CompletableFuture<Void> isAvailableForAliveReaders() {
}
}

@Override
protected FlinkSourceSplit<T> getReaderCheckpoint(int splitId, ReaderAndOutput readerAndOutput) {
// The checkpoint for unbounded sources is fine granular.
byte[] checkpointState =
getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader<T>) readerAndOutput.reader);
return new FlinkSourceSplit<>(
splitId, readerAndOutput.reader.getCurrentSource(), checkpointState);
}

@Override
protected Source.Reader<T> createReader(@Nonnull FlinkSourceSplit<T> sourceSplit)
throws IOException {
Source<T> beamSource = sourceSplit.getBeamSplitSource();
return createUnboundedSourceReader(beamSource, sourceSplit.getSplitState());
}

// -------------- private helper methods ----------------

private void emitRecord(
Expand Down Expand Up @@ -274,4 +295,34 @@ private void createPendingBytesGauge(SourceReaderContext context) {
return pendingBytes;
});
}

@SuppressWarnings("unchecked")
private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<T> reader) {
UnboundedSource<T, CheckpointMarkT> source =
(UnboundedSource<T, CheckpointMarkT>) reader.getCurrentSource();
CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark();
Coder<CheckpointMarkT> coder = source.getCheckpointMarkCoder();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
coder.encode(checkpointMark, baos);
return baos.toByteArray();
} catch (IOException ioe) {
throw new RuntimeException("Failed to encode checkpoint mark.", ioe);
}
}

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Source.Reader<T> createUnboundedSourceReader(
Source<T> beamSource, @Nullable byte[] splitState) throws IOException {
UnboundedSource<T, CheckpointMarkT> unboundedSource =
(UnboundedSource<T, CheckpointMarkT>) beamSource;
Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
if (splitState == null) {
return unboundedSource.createReader(pipelineOptions, null);
} else {
try (ByteArrayInputStream bais = new ByteArrayInputStream(splitState)) {
return unboundedSource.createReader(pipelineOptions, coder.decode(bais));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,9 @@ public int numCollectedRecords() {
}

public boolean allRecordsConsumed() {
boolean allRecordsConsumed = true;
for (Source<?> source : sources) {
allRecordsConsumed = allRecordsConsumed && ((TestSource) source).isConsumptionCompleted();
}
return allRecordsConsumed;
return sources.stream()
.map(TestSource.class::cast)
.allMatch(TestSource::isConsumptionCompleted);
}

public boolean allTimestampReceived() {
Expand Down
Loading