Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand All @@ -58,7 +57,6 @@
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.utils.FlinkClassloading;
import org.apache.beam.runners.flink.translation.utils.NoopLock;
import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
Expand Down Expand Up @@ -146,7 +144,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window

protected transient SideInputReader sideInputReader;

protected transient BufferedOutputManager<OutputT> outputManager;
protected transient FlinkOutputManager<OutputT> outputManager;

private transient DoFnInvoker<InputT, OutputT> doFnInvoker;

Expand Down Expand Up @@ -385,19 +383,7 @@ public void initializeState(StateInitializationContext context) throws Exception

outputManager =
outputManagerFactory.create(
output,
getLockToAcquireForStateAccessDuringBundles(),
getOperatorStateBackend(),
getKeyedStateBackend(),
keySelector);
}

/**
* Subclasses may provide a lock to ensure that the state backend is not accessed concurrently
* during bundle execution.
*/
protected Lock getLockToAcquireForStateAccessDuringBundles() {
return NoopLock.get();
output, getOperatorStateBackend(), getKeyedStateBackend(), keySelector);
}

@Override
Expand Down Expand Up @@ -719,7 +705,6 @@ private void emitAllPushedBackData() throws Exception {
*/
private void checkInvokeStartBundle() {
if (bundleStarted.compareAndSet(false, true)) {
outputManager.flushBuffer();
pushbackDoFnRunner.startBundle();
}
}
Expand Down Expand Up @@ -754,23 +739,23 @@ protected final void invokeFinishBundle() {
}
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) {
// Finish the current bundle before the snapshot barrier is sent downstream. This give us a
// clean state before taking the actual snapshot.
// Ensure that no new bundle gets started as part of finishing a bundle.
while (bundleStarted.get()) {
invokeFinishBundle();
}
}

@Override
public final void snapshotState(StateSnapshotContext context) throws Exception {
if (requiresStableInput) {
// We notify the BufferingDoFnRunner to associate buffered state with this
// snapshot id and start a new buffer for elements arriving after this snapshot.
bufferingDoFnRunner.checkpoint(context.getCheckpointId());
}

// We can't output here anymore because the checkpoint barrier has already been
// sent downstream. This is going to change with 1.6/1.7's prepareSnapshotBarrier.
outputManager.openBuffer();
// Ensure that no new bundle gets started as part of finishing a bundle
while (bundleStarted.get()) {
invokeFinishBundle();
}
outputManager.closeBuffer();

super.snapshotState(context);
}

Expand Down Expand Up @@ -821,33 +806,22 @@ private void setCurrentOutputWatermark(long currentOutputWatermark) {
this.currentOutputWatermark = currentOutputWatermark;
}

/** Factory for creating an {@link BufferedOutputManager} from a Flink {@link Output}. */
/** Factory for creating an {@link FlinkOutputManager} from a Flink {@link Output}. */
interface OutputManagerFactory<OutputT> extends Serializable {
BufferedOutputManager<OutputT> create(
FlinkOutputManager<OutputT> create(
Output<StreamRecord<WindowedValue<OutputT>>> output,
Lock bufferLock,
@Nullable OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend keyedStateBackend,
@Nullable KeySelector keySelector)
throws Exception;
}

/**
* A {@link DoFnRunners.OutputManager} that can buffer its outputs. Uses {@link
* PushedBackElementsHandler} to buffer the data. Buffering data is necessary because no elements
* can be emitted during {@code snapshotState}. This can be removed once we upgrade Flink to >=
* 1.6 which allows us to finish the bundle before the checkpoint barriers have been emitted.
*/
public static class BufferedOutputManager<OutputT> implements DoFnRunners.OutputManager {
/** A {@link DoFnRunners.OutputManager} that forwards data to the Flink runtime. */
public static class FlinkOutputManager<OutputT> implements DoFnRunners.OutputManager {

private final TupleTag<OutputT> mainTag;
private final Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
private final Map<TupleTag<?>, Integer> tagsToIds;
/**
* A lock to be acquired before writing to the buffer. This lock will only be acquired during
* buffering. It will not be acquired during flushing the buffer.
*/
private final Lock bufferLock;

private Map<Integer, TupleTag<?>> idsToTags;
/** Elements buffered during a snapshot, by output id. */
Expand All @@ -856,75 +830,25 @@ public static class BufferedOutputManager<OutputT> implements DoFnRunners.Output

protected final Output<StreamRecord<WindowedValue<OutputT>>> output;

private boolean openBuffer = false;

BufferedOutputManager(
FlinkOutputManager(
Output<StreamRecord<WindowedValue<OutputT>>> output,
TupleTag<OutputT> mainTag,
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
Map<TupleTag<?>, Integer> tagsToIds,
Lock bufferLock,
PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler) {
this.output = output;
this.mainTag = mainTag;
this.tagsToOutputTags = tagsToOutputTags;
this.tagsToIds = tagsToIds;
this.bufferLock = bufferLock;
this.idsToTags = new HashMap<>();
for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) {
idsToTags.put(entry.getValue(), entry.getKey());
}
this.pushedBackElementsHandler = pushedBackElementsHandler;
}

void openBuffer() {
this.openBuffer = true;
}

void closeBuffer() {
this.openBuffer = false;
}

@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
if (!openBuffer) {
emit(tag, value);
} else {
buffer(KV.of(tagsToIds.get(tag), value));
}
}

private void buffer(KV<Integer, WindowedValue<?>> taggedValue) {
try {
bufferLock.lock();
pushedBackElementsHandler.pushBack(taggedValue);
} catch (Exception e) {
throw new RuntimeException("Couldn't pushback element.", e);
} finally {
bufferLock.unlock();
}
}

/**
* Flush elements of bufferState to Flink Output. This method can't be invoked in {@link
* #snapshotState(StateSnapshotContext)}. The buffer should be flushed before starting a new
* bundle when the buffer cannot be concurrently accessed and thus does not need to be guarded
* by a lock.
*/
void flushBuffer() {
try {
pushedBackElementsHandler
.getElements()
.forEach(
element ->
emit(idsToTags.get(element.getKey()), (WindowedValue) element.getValue()));
pushedBackElementsHandler.clear();
} catch (Exception e) {
throw new RuntimeException("Couldn't flush pushed back elements.", e);
}
}

private <T> void emit(TupleTag<T> tag, WindowedValue<T> value) {
if (tag.equals(mainTag)) {
// with tagged outputs we can't get around this because we don't
// know our own output type...
Expand Down Expand Up @@ -977,8 +901,8 @@ public void verifyDeterministic() throws NonDeterministicException {
}

/**
* Implementation of {@link OutputManagerFactory} that creates an {@link BufferedOutputManager}
* that can write to multiple logical outputs by Flink side output.
* Implementation of {@link OutputManagerFactory} that creates an {@link FlinkOutputManager} that
* can write to multiple logical outputs by Flink side output.
*/
public static class MultiOutputOutputManagerFactory<OutputT>
implements OutputManagerFactory<OutputT> {
Expand Down Expand Up @@ -1013,15 +937,13 @@ public MultiOutputOutputManagerFactory(
}

@Override
public BufferedOutputManager<OutputT> create(
public FlinkOutputManager<OutputT> create(
Output<StreamRecord<WindowedValue<OutputT>>> output,
Lock bufferLock,
OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend keyedStateBackend,
@Nullable KeySelector keySelector)
throws Exception {
Preconditions.checkNotNull(output);
Preconditions.checkNotNull(bufferLock);
Preconditions.checkNotNull(operatorStateBackend);
Preconditions.checkState(
(keyedStateBackend == null) == (keySelector == null),
Expand All @@ -1046,8 +968,8 @@ public BufferedOutputManager<OutputT> create(
pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState);
}

return new BufferedOutputManager<>(
output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler);
return new FlinkOutputManager<>(
output, mainTag, tagsToOutputTags, tagsToIds, pushedBackElementsHandler);
}

private TaggedKvCoder buildTaggedKvCoder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ public ExecutableStageDoFnOperator(
this.stateBackendLock = new ReentrantLock();
}

@Override
protected Lock getLockToAcquireForStateAccessDuringBundles() {
return stateBackendLock;
}

@Override
public void open() throws Exception {
executableStage = ExecutableStage.fromPayload(payload);
Expand Down Expand Up @@ -526,7 +521,7 @@ private static class SdkHarnessDoFnRunner<InputT, OutputT>
private final StageBundleFactory stageBundleFactory;
private final StateRequestHandler stateRequestHandler;
private final BundleProgressHandler progressHandler;
private final BufferedOutputManager<OutputT> outputManager;
private final FlinkOutputManager<OutputT> outputManager;
private final Map<String, TupleTag<?>> outputMap;
/** Timer Output Pcollection id => TimerSpec. */
private final Map<String, TimerSpec> timerOutputIdToSpecMap;
Expand All @@ -543,7 +538,7 @@ public SdkHarnessDoFnRunner(
StageBundleFactory stageBundleFactory,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler,
BufferedOutputManager<OutputT> outputManager,
FlinkOutputManager<OutputT> outputManager,
Map<String, TupleTag<?>> outputMap,
Coder<BoundedWindow> windowCoder,
BiConsumer<WindowedValue<InputT>, TimerInternals.TimerData> timerRegistration,
Expand Down
Loading