Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public Dataflow create(PipelineOptions options) {

/**
* Whether to update the currently running pipeline with the same name as this one.
*
*
* @deprecated This property is replaced by @{link DataflowPipelineOptions#getUpdate()}
*/
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,11 @@ public void clear(StateAccessor<?> state) {
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.
*
* @param isEndOfWindow should be {@code true} only if the pane is being emitted
* because an end-of-window timer has fired and the trigger agreed we should fire.
* @param isFinal should be {@code true} only if the triggering machinery can guarantee
* no further firings for the
*/
public ReadableState<PaneInfo> getNextPaneInfo(ReduceFn<?, ?, ?, ?>.Context context,
final boolean isEndOfWindow, final boolean isFinal) {
public ReadableState<PaneInfo> getNextPaneInfo(
ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
final Object key = context.key();
final ReadableState<PaneInfo> previousPaneFuture =
context.state().access(PaneInfoTracker.PANE_INFO_TAG);
Expand All @@ -76,7 +74,7 @@ public ReadableState<PaneInfo> readLater() {
@Override
public PaneInfo read() {
PaneInfo previousPane = previousPaneFuture.read();
return describePane(key, windowMaxTimestamp, previousPane, isEndOfWindow, isFinal);
return describePane(key, windowMaxTimestamp, previousPane, isFinal);
}
};
}
Expand All @@ -85,8 +83,8 @@ public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo
context.state().access(PANE_INFO_TAG).write(currentPane);
}

private <W> PaneInfo describePane(Object key, Instant windowMaxTimestamp, PaneInfo previousPane,
boolean isEndOfWindow, boolean isFinal) {
private <W> PaneInfo describePane(
Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
boolean isFirst = previousPane == null;
Timing previousTiming = isFirst ? null : previousPane.getTiming();
long index = isFirst ? 0 : previousPane.getIndex() + 1;
Expand All @@ -104,26 +102,28 @@ private <W> PaneInfo describePane(Object key, Instant windowMaxTimestamp, PaneIn
// if the output watermark is behind the end of the window.
boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;

// True is the input watermark hasn't passed the window's max timestamp.
boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);

Timing timing;
if (isLateForOutput || !onlyEarlyPanesSoFar) {
// The output watermark has already passed the end of this window, or we have already
// emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
// consider this pane LATE.
timing = Timing.LATE;
} else if (isEndOfWindow) {
// This is the unique ON_TIME firing for the window.
timing = Timing.ON_TIME;
} else {
// All other cases are EARLY.
} else if (isEarlyForInput) {
// This is an EARLY firing.
timing = Timing.EARLY;
nonSpeculativeIndex = -1;
} else {
// This is the unique ON_TIME firing for the window.
timing = Timing.ON_TIME;
}

WindowTracing.debug(
"describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
+ "inputWatermark:{}; outputWatermark:{}; isEndOfWindow:{}; isLateForOutput:{}",
timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isEndOfWindow,
isLateForOutput);
+ "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);

if (previousPane != null) {
// Timing transitions should follow EARLY* ON_TIME? LATE*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
contextFactory.base(mergedWindow, StateStyle.RENAMED);
triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
emitIfAppropriate(directContext, renamedContext, false/* isEndOfWindow */);
emitIfAppropriate(directContext, renamedContext);
}

// We're all done with merging and emitting elements so can compress the activeWindow state.
Expand Down Expand Up @@ -532,14 +532,6 @@ public void onTimer(TimerData timer) throws Exception {
"ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
}

// If this is an end-of-window timer then:
// 1. We need to set a GC timer
// 2. We need to let the PaneInfoTracker know that we are transitioning from early to late,
// and possibly emitting an on-time pane.
boolean isEndOfWindow =
TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());

// If this is a garbage collection timer then we should trigger and garbage collect the window.
Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
boolean isGarbageCollection =
Expand All @@ -556,7 +548,7 @@ public void onTimer(TimerData timer) throws Exception {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
onTrigger(directContext, renamedContext, isEndOfWindow, true/* isFinished */);
onTrigger(directContext, renamedContext, true/* isFinished */);
}

// Cleanup flavor B: Clear all the remaining state for this window since we'll never
Expand All @@ -569,9 +561,12 @@ public void onTimer(TimerData timer) throws Exception {
key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
if (windowIsActive) {
emitIfAppropriate(directContext, renamedContext, isEndOfWindow);
emitIfAppropriate(directContext, renamedContext);
}

// If this is an end-of-window timer then, we need to set a GC timer
boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());
if (isEndOfWindow) {
// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
Expand Down Expand Up @@ -649,7 +644,7 @@ private boolean shouldDiscardAfterFiring(boolean isFinished) {
* Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
*/
private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext, boolean isEndOfWindow)
ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
throws Exception {
if (!triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
Expand All @@ -667,7 +662,7 @@ private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directCon
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
onTrigger(directContext, renamedContext, isEndOfWindow, isFinished);
onTrigger(directContext, renamedContext, isFinished);

// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());
Expand All @@ -692,13 +687,12 @@ private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directCon
/**
* Do we need to emit a pane?
*/
private boolean needToEmit(
boolean isEmpty, boolean isEndOfWindow, boolean isFinished, PaneInfo.Timing timing) {
private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
if (!isEmpty) {
// The pane has elements.
return true;
}
if (isEndOfWindow && timing == Timing.ON_TIME) {
if (timing == Timing.ON_TIME) {
// This is the unique ON_TIME pane.
return true;
}
Expand All @@ -715,14 +709,13 @@ private boolean needToEmit(
private void onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean isEndOfWindow,
boolean isFinished)
throws Exception {
// Prefetch necessary states
ReadableState<Instant> outputTimestampFuture =
watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
ReadableState<PaneInfo> paneFuture =
paneInfoTracker.getNextPaneInfo(directContext, isEndOfWindow, isFinished).readLater();
paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
ReadableState<Boolean> isEmptyFuture =
nonEmptyPanes.isEmpty(renamedContext.state()).readLater();

Expand All @@ -735,7 +728,7 @@ private void onTrigger(
final Instant outputTimestamp = outputTimestampFuture.read();

// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmptyFuture.read(), isEndOfWindow, isFinished, pane.getTiming())) {
if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.
final List<W> windows = Collections.singletonList(directContext.window());
ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,19 +479,20 @@ public void testPaneInfoAllStates() throws Exception {
when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 3);
assertThat(tester.extractOutput(), contains(
// This is late, because the trigger wasn't waiting for AfterWatermark
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 2, -1))));
WindowMatchers.valueWithPaneInfo(
PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));

when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 4);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 3, -1))));
WindowMatchers.valueWithPaneInfo(
PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));

when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTrigger);
injectElement(tester, 5);
assertThat(tester.extractOutput(), contains(
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.EARLY, 4, -1))));
WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2))));
}

@Test
Expand Down