From 1c89a1b3ac0ff296003ae443e6b4763f501b8ada Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 2 Mar 2016 20:45:59 -0800 Subject: [PATCH 1/5] Basic non-null checks --- .../dataflow/sdk/util/ReduceFnRunner.java | 51 ++++++++++++++++- .../dataflow/sdk/util/TriggerRunner.java | 2 + .../dataflow/sdk/util/WatermarkHold.java | 55 ++++++++++++++----- 3 files changed, 92 insertions(+), 16 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 2e2d1f6c7d8a..f1d45828f941 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -499,6 +499,11 @@ private Collection processElement(WindowedValue value) throws Excepti directContext.timestamp(), directContext.timers(), directContext.state()); + + // At this point, if triggerRunner.shouldFire before the processValue then + // triggerRunner.shouldFire after the processValue. In other words adding values + // cannot take a trigger state from firing to non-firing. + // (We don't actually assert this since it is too slow.) } return windows; @@ -568,6 +573,11 @@ public void onTimer(TimerData timer) throws Exception { boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. + // We could assert this but it is very expensive. + // Since we are processing an on-time firing we should schedule the garbage collection // timer. (If getAllowedLateness is zero then the timer event will be considered a // cleanup event and handled by the above). @@ -715,8 +725,11 @@ private void onTrigger( ReduceFn.Context renamedContext, boolean isFinished) throws Exception { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + // Prefetch necessary states - ReadableState outputTimestampFuture = + ReadableState outputTimestampFuture = watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); ReadableState paneFuture = paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); @@ -729,7 +742,41 @@ private void onTrigger( // Calculate the pane info. final PaneInfo pane = paneFuture.read(); // Extract the window hold, and as a side effect clear it. - final Instant outputTimestamp = outputTimestampFuture.read(); + + WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + final Instant outputTimestamp = pair.oldHold; + @Nullable Instant newHold = pair.newHold; + + if (newHold != null && inputWM != null) { + // We can't be finished yet. + Preconditions.checkState( + !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); + // The hold cannot be behind the input watermark. + Preconditions.checkState( + !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); + if (newHold.isAfter(directContext.window().maxTimestamp())) { + // The hold must be for garbage collection, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual( + directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); + } else { + // The hold must be for the end-of-window, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual(directContext.window().maxTimestamp()), + "new hold %s should be at end of window %s", + newHold, + directContext.window()); + Preconditions.checkState( + !isEndOfWindow, + "new hold at %s for %s but this is the watermark trigger", + newHold, + directContext.window()); + } + } // Only emit a pane if it has data or empty panes are observable. if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index dcfd03516b74..8fc498112e34 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -172,6 +172,8 @@ public boolean shouldFire(W window, Timers timers, StateAccessor state) throw } public void onFire(W window, Timers timers, StateAccessor state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); Trigger.TriggerContext context = contextFactory.base(window, timers, diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index d537ddb0e80d..31e36c5ef3a8 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -228,6 +228,7 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); // Only add the hold if we can be sure: // - the backend will be able to respect it @@ -287,6 +288,8 @@ private Instant addEndOfWindowHold(ReduceFn.Context context) { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); @@ -329,6 +332,8 @@ private Instant addGarbageCollectionHold(ReduceFn.Context context) { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -368,6 +373,19 @@ public void onMerge(ReduceFn.OnMergeContext context) { addEndOfWindowOrGarbageCollectionHolds(context); } + /** + * Result of {@link #extractAndRelease}. + */ + public static class OldAndNewHolds { + public final Instant oldHold; + @Nullable public final Instant newHold; + + public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { + this.oldHold = oldHold; + this.newHold = newHold; + } + } + /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. @@ -377,7 +395,7 @@ public void onMerge(ReduceFn.OnMergeContext context) { * elements in the current pane. If there is no such value the timestamp is the end * of the window. */ - public ReadableState extractAndRelease( + public ReadableState extractAndRelease( final ReduceFn.Context context, final boolean isFinished) { WindowTracing.debug( "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -385,38 +403,38 @@ public ReadableState extractAndRelease( timerInternals.currentOutputWatermarkTime()); final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag); final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); - return new ReadableState() { + return new ReadableState() { @Override - public ReadableState readLater() { + public ReadableState readLater() { elementHoldState.readLater(); extraHoldState.readLater(); return this; } @Override - public Instant read() { + public OldAndNewHolds read() { // Read both the element and extra holds. Instant elementHold = elementHoldState.read(); Instant extraHold = extraHoldState.read(); - Instant hold; + Instant oldHold; // Find the minimum, accounting for null. if (elementHold == null) { - hold = extraHold; + oldHold = extraHold; } else if (extraHold == null) { - hold = elementHold; + oldHold = elementHold; } else if (elementHold.isBefore(extraHold)) { - hold = elementHold; + oldHold = elementHold; } else { - hold = extraHold; + oldHold = extraHold; } - if (hold == null || hold.isAfter(context.window().maxTimestamp())) { + if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { // If no hold (eg because all elements came in behind the output watermark), or // the hold was for garbage collection, take the end of window as the result. WindowTracing.debug( "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " + "for key:{}; window:{}", - hold, context.key(), context.window()); - hold = context.window().maxTimestamp(); + oldHold, context.key(), context.window()); + oldHold = context.window().maxTimestamp(); } WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window()); @@ -425,13 +443,14 @@ public Instant read() { elementHoldState.clear(); extraHoldState.clear(); + @Nullable Instant newHold = null; if (!isFinished) { // Only need to leave behind an end-of-window or garbage collection hold // if future elements will be processed. - addEndOfWindowOrGarbageCollectionHolds(context); + newHold = addEndOfWindowOrGarbageCollectionHolds(context); } - return hold; + return new OldAndNewHolds(oldHold, newHold); } }; } @@ -447,4 +466,12 @@ public void clearHolds(ReduceFn.Context context) { context.state().access(elementHoldTag).clear(); context.state().access(EXTRA_HOLD_TAG).clear(); } + + /** + * Return the current data hold, or null if none. Does not clear. For debugging only. + */ + @Nullable + public Instant getDataCurrent(ReduceFn.Context context) { + return context.state().access(elementHoldTag).read(); + } } From 1cb04908098df514e5b98a7ebd128512fbd7522d Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Fri, 4 Mar 2016 15:16:33 -0800 Subject: [PATCH 2/5] Input watermarks can never be null. --- .../inprocess/InMemoryWatermarkManager.java | 3 ++- .../inprocess/InProcessTimerInternals.java | 1 - .../sdk/util/LateDataDroppingDoFnRunner.java | 3 +-- .../dataflow/sdk/util/PaneInfoTracker.java | 2 +- .../sdk/util/ReduceFnContextFactory.java | 1 - .../dataflow/sdk/util/ReduceFnRunner.java | 18 +++++++++--------- .../dataflow/sdk/util/TimerInternals.java | 4 ++-- .../google/cloud/dataflow/sdk/util/Timers.java | 3 +-- .../sdk/util/TriggerContextFactory.java | 1 - .../cloud/dataflow/sdk/util/WatermarkHold.java | 9 +++------ .../dataflow/sdk/util/ReduceFnTester.java | 8 +++----- .../cloud/dataflow/sdk/util/TriggerTester.java | 17 +++++------------ 12 files changed, 27 insertions(+), 43 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index a9a62a6aa388..c4d67db12124 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -994,7 +995,7 @@ private TransformWatermarks( * Returns the input watermark of the {@link AppliedPTransform}. */ public Instant getInputWatermark() { - return inputWatermark.get(); + return Preconditions.checkNotNull(inputWatermark.get()); } /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java index 06ba7b82f432..1d075c54c2a3 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java @@ -70,7 +70,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { return watermarks.getInputWatermark(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java index 31927ab8823b..3dfa06474ef3 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java @@ -138,8 +138,7 @@ public boolean apply(WindowedValue input) { /** Is {@code window} expired w.r.t. the garbage collection watermark? */ private boolean canDropDueToExpiredWindow(BoundedWindow window) { Instant inputWM = timerInternals.currentInputWatermarkTime(); - return inputWM != null - && window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); } } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java index a7818a30a037..9fa36b0eb765 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java @@ -103,7 +103,7 @@ private PaneInfo describePane( boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY; // True is the input watermark hasn't passed the window's max timestamp. - boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp); + boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp); Timing timing; if (isLateForOutput || !onlyEarlyPanesSoFar) { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java index bdbaf1098e3a..7649d520bb96 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java @@ -146,7 +146,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index f1d45828f941..560d8ecd44f3 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -537,6 +537,10 @@ public void onTimer(TimerData timer) throws Exception { "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } + // If this is an end-of-window timer then, we need to set a GC timer + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + // If this is a garbage collection timer then we should trigger and garbage collect the window. Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); boolean isGarbageCollection = @@ -553,7 +557,7 @@ public void onTimer(TimerData timer) throws Exception { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. - onTrigger(directContext, renamedContext, true/* isFinished */); + onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); } // Cleanup flavor B: Clear all the remaining state for this window since we'll never @@ -569,9 +573,6 @@ public void onTimer(TimerData timer) throws Exception { emitIfAppropriate(directContext, renamedContext); } - // If this is an end-of-window timer then, we need to set a GC timer - boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { // If the window strategy trigger includes a watermark trigger then at this point // there should be no data holds, either because we'd already cleared them on an @@ -676,7 +677,7 @@ private void emitIfAppropriate(ReduceFn.Context directCon // Run onTrigger to produce the actual pane contents. // As a side effect it will clear all element holds, but not necessarily any // end-of-window or garbage collection holds. - onTrigger(directContext, renamedContext, isFinished); + onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/); // Now that we've triggered, the pane is empty. nonEmptyPanes.clearPane(renamedContext.state()); @@ -723,10 +724,9 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing private void onTrigger( final ReduceFn.Context directContext, ReduceFn.Context renamedContext, - boolean isFinished) + boolean isFinished, boolean isEndOfWindow) throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); // Prefetch necessary states ReadableState outputTimestampFuture = @@ -747,7 +747,7 @@ private void onTrigger( final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; - if (newHold != null && inputWM != null) { + if (newHold != null) { // We can't be finished yet. Preconditions.checkState( !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); @@ -825,7 +825,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer( Instant endOfWindow = directContext.window().maxTimestamp(); Instant fireTime; String which; - if (inputWM != null && endOfWindow.isBefore(inputWM)) { + if (endOfWindow.isBefore(inputWM)) { fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness()); which = "garbage collection"; } else { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java index c823ed39b167..b26e6e87b05f 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java @@ -79,10 +79,11 @@ public interface TimerInternals { /** * Return the current, local input watermark timestamp for this computation - * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown. + * in the {@link TimeDomain#EVENT_TIME} time domain. * *

This value: *

    + *
  1. Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. *
  2. Is monotonically increasing. *
  3. May differ between workers due to network and other delays. *
  4. Will never be ahead of the global input watermark for this computation. But it @@ -95,7 +96,6 @@ public interface TimerInternals { * it is possible for an element to be considered locally on-time even though it is * globally late. */ - @Nullable Instant currentInputWatermarkTime(); /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java index 7d4b4f2abe57..2ddf5245b8f2 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java @@ -54,7 +54,6 @@ public interface Timers { @Nullable public abstract Instant currentSynchronizedProcessingTime(); - /** Returns the current event time or {@code null} if unknown. */ - @Nullable + /** Returns the current event time. */ public abstract Instant currentEventTime(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java index 64ff402a9aec..50e8b324eafb 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java @@ -209,7 +209,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timers.currentEventTime(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index 31e36c5ef3a8..7f814c457a61 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -228,7 +228,6 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); // Only add the hold if we can be sure: // - the backend will be able to respect it @@ -242,7 +241,7 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) if (outputWM != null && elementHold.isBefore(outputWM)) { which = "too late to effect output watermark"; tooLate = true; - } else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM)) { + } else if (context.window().maxTimestamp().isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -288,12 +287,11 @@ private Instant addEndOfWindowHold(ReduceFn.Context context) { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); - if (inputWM != null && eowHold.isBefore(inputWM)) { + if (eowHold.isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -332,13 +330,12 @@ private Instant addGarbageCollectionHold(ReduceFn.Context context) { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM); - Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM), + Preconditions.checkState(!gcHold.isBefore(inputWM), "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM); context.state().access(EXTRA_HOLD_TAG).add(gcHold); return gcHold; diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java index d4620a7827c5..4aeaa0cb99ac 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java @@ -599,7 +599,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -666,9 +666,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -692,7 +691,7 @@ public void advanceInputWatermark( ReduceFnRunner runner, Instant newInputWatermark) throws Exception { Preconditions.checkNotNull(newInputWatermark); Preconditions.checkState( - inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + !newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -713,7 +712,6 @@ public void advanceInputWatermark( public void advanceOutputWatermark(Instant newOutputWatermark) { Preconditions.checkNotNull(newOutputWatermark); - Preconditions.checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java index 0c7183020291..cdbf5f6b9183 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java @@ -21,12 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertTrue; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; -import com.google.cloud.dataflow.sdk.transforms.windowing.TriggerBuilder; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; import com.google.cloud.dataflow.sdk.util.ActiveWindowSet.MergeCallback; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode; @@ -41,6 +36,7 @@ import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState; import com.google.cloud.dataflow.sdk.values.TimestampedValue; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -428,7 +424,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -471,9 +467,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -495,7 +490,7 @@ public String toString() { public void advanceInputWatermark(Instant newInputWatermark) throws Exception { checkNotNull(newInputWatermark); - checkState(inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + checkState(!newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -513,7 +508,6 @@ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception { checkNotNull(newOutputWatermark); - checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", @@ -577,7 +571,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); } From d9324089dab09e97db7ff845788f7b43cbdac312 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Sat, 5 Mar 2016 09:40:10 -0800 Subject: [PATCH 3/5] No .* imports. --- .../com/google/cloud/dataflow/sdk/util/TriggerTester.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java index cdbf5f6b9183..f291438f8bda 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java @@ -21,7 +21,12 @@ import static com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertTrue; -import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; +import com.google.cloud.dataflow.sdk.transforms.windowing.TriggerBuilder; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; import com.google.cloud.dataflow.sdk.util.ActiveWindowSet.MergeCallback; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode; From fb3943ed3347356aabb3d69c4564c1b9a2456d73 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Mon, 7 Mar 2016 10:09:41 -0800 Subject: [PATCH 4/5] Fix broken build --- .../java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java index 1daadc7e2b7f..cc609532ba76 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java @@ -1,3 +1,4 @@ + /* * Copyright (C) 2016 Google Inc. * From b26c268b5d9eea0a62a461d8beeb8aba9fb81dd0 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Fri, 25 Mar 2016 13:37:34 -0700 Subject: [PATCH 5/5] Ken's comments --- .../dataflow/sdk/util/ReduceFnRunner.java | 40 +++++++++---------- .../dataflow/sdk/util/TriggerRunner.java | 2 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 560d8ecd44f3..c7180c9074cc 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -557,7 +557,7 @@ public void onTimer(TimerData timer) throws Exception { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. - onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); + onTrigger(directContext, renamedContext, true/* isFinished */); } // Cleanup flavor B: Clear all the remaining state for this window since we'll never @@ -677,7 +677,7 @@ private void emitIfAppropriate(ReduceFn.Context directCon // Run onTrigger to produce the actual pane contents. // As a side effect it will clear all element holds, but not necessarily any // end-of-window or garbage collection holds. - onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/); + onTrigger(directContext, renamedContext, isFinished); // Now that we've triggered, the pane is empty. nonEmptyPanes.clearPane(renamedContext.state()); @@ -723,8 +723,7 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing */ private void onTrigger( final ReduceFn.Context directContext, - ReduceFn.Context renamedContext, - boolean isFinished, boolean isEndOfWindow) + ReduceFn.Context renamedContext, boolean isFinished) throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); @@ -750,31 +749,32 @@ private void onTrigger( if (newHold != null) { // We can't be finished yet. Preconditions.checkState( - !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); + !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); // The hold cannot be behind the input watermark. Preconditions.checkState( - !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); + !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); if (newHold.isAfter(directContext.window().maxTimestamp())) { // The hold must be for garbage collection, which can't have happened yet. Preconditions.checkState( - newHold.isEqual( - directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())), - "new hold %s should be at garbage collection for window %s plus %s", - newHold, - directContext.window(), - windowingStrategy.getAllowedLateness()); + newHold.isEqual( + directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); } else { // The hold must be for the end-of-window, which can't have happened yet. Preconditions.checkState( - newHold.isEqual(directContext.window().maxTimestamp()), - "new hold %s should be at end of window %s", - newHold, - directContext.window()); + newHold.isEqual(directContext.window().maxTimestamp()), + "new hold %s should be at end of window %s", + newHold, + directContext.window()); Preconditions.checkState( - !isEndOfWindow, - "new hold at %s for %s but this is the watermark trigger", - newHold, - directContext.window()); + !inputWM.isAfter(directContext.window().maxTimestamp()), + "new hold at %s for %s but input watermark %s already beyond end of window", + newHold, + directContext.window(), + inputWM); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index 8fc498112e34..49ccaa3b250a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -172,7 +172,7 @@ public boolean shouldFire(W window, Timers timers, StateAccessor state) throw } public void onFire(W window, Timers timers, StateAccessor state) throws Exception { - // shouldFire should be false. + // shouldFire should be true. // However it is too expensive to assert. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();