diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java index a14be44987..1da2b435fa 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java @@ -172,7 +172,7 @@ public Dataflow create(PipelineOptions options) { /** * Whether to update the currently running pipeline with the same name as this one. - * + * * @deprecated This property is replaced by @{link DataflowPipelineOptions#getUpdate()} */ @Deprecated diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java index 38499c2e2b..a7818a30a0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java @@ -54,13 +54,11 @@ public void clear(StateAccessor state) { * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane * info includes the timing for the pane, who's calculation is quite subtle. * - * @param isEndOfWindow should be {@code true} only if the pane is being emitted - * because an end-of-window timer has fired and the trigger agreed we should fire. * @param isFinal should be {@code true} only if the triggering machinery can guarantee * no further firings for the */ - public ReadableState getNextPaneInfo(ReduceFn.Context context, - final boolean isEndOfWindow, final boolean isFinal) { + public ReadableState getNextPaneInfo( + ReduceFn.Context context, final boolean isFinal) { final Object key = context.key(); final ReadableState previousPaneFuture = context.state().access(PaneInfoTracker.PANE_INFO_TAG); @@ -76,7 +74,7 @@ public ReadableState readLater() { @Override public PaneInfo read() { PaneInfo previousPane = previousPaneFuture.read(); - return describePane(key, windowMaxTimestamp, previousPane, isEndOfWindow, isFinal); + return describePane(key, windowMaxTimestamp, previousPane, isFinal); } }; } @@ -85,8 +83,8 @@ public void storeCurrentPaneInfo(ReduceFn.Context context, PaneInfo context.state().access(PANE_INFO_TAG).write(currentPane); } - private PaneInfo describePane(Object key, Instant windowMaxTimestamp, PaneInfo previousPane, - boolean isEndOfWindow, boolean isFinal) { + private PaneInfo describePane( + Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) { boolean isFirst = previousPane == null; Timing previousTiming = isFirst ? null : previousPane.getTiming(); long index = isFirst ? 0 : previousPane.getIndex() + 1; @@ -104,26 +102,28 @@ private PaneInfo describePane(Object key, Instant windowMaxTimestamp, PaneIn // if the output watermark is behind the end of the window. boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY; + // True is the input watermark hasn't passed the window's max timestamp. + boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp); + Timing timing; if (isLateForOutput || !onlyEarlyPanesSoFar) { // The output watermark has already passed the end of this window, or we have already // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must // consider this pane LATE. timing = Timing.LATE; - } else if (isEndOfWindow) { - // This is the unique ON_TIME firing for the window. - timing = Timing.ON_TIME; - } else { - // All other cases are EARLY. + } else if (isEarlyForInput) { + // This is an EARLY firing. timing = Timing.EARLY; nonSpeculativeIndex = -1; + } else { + // This is the unique ON_TIME firing for the window. + timing = Timing.ON_TIME; } WindowTracing.debug( "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; " - + "inputWatermark:{}; outputWatermark:{}; isEndOfWindow:{}; isLateForOutput:{}", - timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isEndOfWindow, - isLateForOutput); + + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}", + timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput); if (previousPane != null) { // Timing transitions should follow EARLY* ON_TIME? LATE* diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index fe5c474210..1a009bb051 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -289,7 +289,7 @@ public void processElements(Iterable> values) throws Excep ReduceFn.Context renamedContext = contextFactory.base(mergedWindow, StateStyle.RENAMED); triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); - emitIfAppropriate(directContext, renamedContext, false/* isEndOfWindow */); + emitIfAppropriate(directContext, renamedContext); } // We're all done with merging and emitting elements so can compress the activeWindow state. @@ -532,14 +532,6 @@ public void onTimer(TimerData timer) throws Exception { "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } - // If this is an end-of-window timer then: - // 1. We need to set a GC timer - // 2. We need to let the PaneInfoTracker know that we are transitioning from early to late, - // and possibly emitting an on-time pane. - boolean isEndOfWindow = - TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); - // If this is a garbage collection timer then we should trigger and garbage collect the window. Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); boolean isGarbageCollection = @@ -556,7 +548,7 @@ public void onTimer(TimerData timer) throws Exception { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. - onTrigger(directContext, renamedContext, isEndOfWindow, true/* isFinished */); + onTrigger(directContext, renamedContext, true/* isFinished */); } // Cleanup flavor B: Clear all the remaining state for this window since we'll never @@ -569,9 +561,12 @@ public void onTimer(TimerData timer) throws Exception { key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); if (windowIsActive) { - emitIfAppropriate(directContext, renamedContext, isEndOfWindow); + emitIfAppropriate(directContext, renamedContext); } + // If this is an end-of-window timer then, we need to set a GC timer + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { // Since we are processing an on-time firing we should schedule the garbage collection // timer. (If getAllowedLateness is zero then the timer event will be considered a @@ -649,7 +644,7 @@ private boolean shouldDiscardAfterFiring(boolean isFinished) { * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state. */ private void emitIfAppropriate(ReduceFn.Context directContext, - ReduceFn.Context renamedContext, boolean isEndOfWindow) + ReduceFn.Context renamedContext) throws Exception { if (!triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { @@ -667,7 +662,7 @@ private void emitIfAppropriate(ReduceFn.Context directCon // Run onTrigger to produce the actual pane contents. // As a side effect it will clear all element holds, but not necessarily any // end-of-window or garbage collection holds. - onTrigger(directContext, renamedContext, isEndOfWindow, isFinished); + onTrigger(directContext, renamedContext, isFinished); // Now that we've triggered, the pane is empty. nonEmptyPanes.clearPane(renamedContext.state()); @@ -692,13 +687,12 @@ private void emitIfAppropriate(ReduceFn.Context directCon /** * Do we need to emit a pane? */ - private boolean needToEmit( - boolean isEmpty, boolean isEndOfWindow, boolean isFinished, PaneInfo.Timing timing) { + private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { if (!isEmpty) { // The pane has elements. return true; } - if (isEndOfWindow && timing == Timing.ON_TIME) { + if (timing == Timing.ON_TIME) { // This is the unique ON_TIME pane. return true; } @@ -715,14 +709,13 @@ private boolean needToEmit( private void onTrigger( final ReduceFn.Context directContext, ReduceFn.Context renamedContext, - boolean isEndOfWindow, boolean isFinished) throws Exception { // Prefetch necessary states ReadableState outputTimestampFuture = watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); ReadableState paneFuture = - paneInfoTracker.getNextPaneInfo(directContext, isEndOfWindow, isFinished).readLater(); + paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); ReadableState isEmptyFuture = nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); @@ -735,7 +728,7 @@ private void onTrigger( final Instant outputTimestamp = outputTimestampFuture.read(); // Only emit a pane if it has data or empty panes are observable. - if (needToEmit(isEmptyFuture.read(), isEndOfWindow, isFinished, pane.getTiming())) { + if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method. final List windows = Collections.singletonList(directContext.window()); ReduceFn.OnTriggerContext renamedTriggerContext = diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index c85b1ca4a5..4fb3e37cb4 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -479,19 +479,20 @@ public void testPaneInfoAllStates() throws Exception { when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 3); assertThat(tester.extractOutput(), contains( - // This is late, because the trigger wasn't waiting for AfterWatermark - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 2, -1)))); + WindowMatchers.valueWithPaneInfo( + PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)))); when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 4); assertThat(tester.extractOutput(), contains( - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 3, -1)))); + WindowMatchers.valueWithPaneInfo( + PaneInfo.createPane(false, false, Timing.LATE, 3, 1)))); when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); triggerShouldFinish(mockTrigger); injectElement(tester, 5); assertThat(tester.extractOutput(), contains( - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.EARLY, 4, -1)))); + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2)))); } @Test