diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index a9a62a6aa388..c4d67db12124 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -994,7 +995,7 @@ private TransformWatermarks( * Returns the input watermark of the {@link AppliedPTransform}. */ public Instant getInputWatermark() { - return inputWatermark.get(); + return Preconditions.checkNotNull(inputWatermark.get()); } /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java index 06ba7b82f432..1d075c54c2a3 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java @@ -70,7 +70,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { return watermarks.getInputWatermark(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java index 31927ab8823b..3dfa06474ef3 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java @@ -138,8 +138,7 @@ public boolean apply(WindowedValue input) { /** Is {@code window} expired w.r.t. the garbage collection watermark? */ private boolean canDropDueToExpiredWindow(BoundedWindow window) { Instant inputWM = timerInternals.currentInputWatermarkTime(); - return inputWM != null - && window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); } } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java index a7818a30a037..9fa36b0eb765 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java @@ -103,7 +103,7 @@ private PaneInfo describePane( boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY; // True is the input watermark hasn't passed the window's max timestamp. - boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp); + boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp); Timing timing; if (isLateForOutput || !onlyEarlyPanesSoFar) { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java index bdbaf1098e3a..7649d520bb96 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java @@ -146,7 +146,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 2e2d1f6c7d8a..c7180c9074cc 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -499,6 +499,11 @@ private Collection processElement(WindowedValue value) throws Excepti directContext.timestamp(), directContext.timers(), directContext.state()); + + // At this point, if triggerRunner.shouldFire before the processValue then + // triggerRunner.shouldFire after the processValue. In other words adding values + // cannot take a trigger state from firing to non-firing. + // (We don't actually assert this since it is too slow.) } return windows; @@ -532,6 +537,10 @@ public void onTimer(TimerData timer) throws Exception { "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } + // If this is an end-of-window timer then, we need to set a GC timer + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + // If this is a garbage collection timer then we should trigger and garbage collect the window. Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); boolean isGarbageCollection = @@ -564,10 +573,12 @@ public void onTimer(TimerData timer) throws Exception { emitIfAppropriate(directContext, renamedContext); } - // If this is an end-of-window timer then, we need to set a GC timer - boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. + // We could assert this but it is very expensive. + // Since we are processing an on-time firing we should schedule the garbage collection // timer. (If getAllowedLateness is zero then the timer event will be considered a // cleanup event and handled by the above). @@ -712,11 +723,12 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing */ private void onTrigger( final ReduceFn.Context directContext, - ReduceFn.Context renamedContext, - boolean isFinished) + ReduceFn.Context renamedContext, boolean isFinished) throws Exception { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + // Prefetch necessary states - ReadableState outputTimestampFuture = + ReadableState outputTimestampFuture = watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); ReadableState paneFuture = paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); @@ -729,7 +741,42 @@ private void onTrigger( // Calculate the pane info. final PaneInfo pane = paneFuture.read(); // Extract the window hold, and as a side effect clear it. - final Instant outputTimestamp = outputTimestampFuture.read(); + + WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + final Instant outputTimestamp = pair.oldHold; + @Nullable Instant newHold = pair.newHold; + + if (newHold != null) { + // We can't be finished yet. + Preconditions.checkState( + !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); + // The hold cannot be behind the input watermark. + Preconditions.checkState( + !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); + if (newHold.isAfter(directContext.window().maxTimestamp())) { + // The hold must be for garbage collection, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual( + directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); + } else { + // The hold must be for the end-of-window, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual(directContext.window().maxTimestamp()), + "new hold %s should be at end of window %s", + newHold, + directContext.window()); + Preconditions.checkState( + !inputWM.isAfter(directContext.window().maxTimestamp()), + "new hold at %s for %s but input watermark %s already beyond end of window", + newHold, + directContext.window(), + inputWM); + } + } // Only emit a pane if it has data or empty panes are observable. if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { @@ -778,7 +825,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer( Instant endOfWindow = directContext.window().maxTimestamp(); Instant fireTime; String which; - if (inputWM != null && endOfWindow.isBefore(inputWM)) { + if (endOfWindow.isBefore(inputWM)) { fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness()); which = "garbage collection"; } else { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java index c823ed39b167..b26e6e87b05f 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java @@ -79,10 +79,11 @@ public interface TimerInternals { /** * Return the current, local input watermark timestamp for this computation - * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown. + * in the {@link TimeDomain#EVENT_TIME} time domain. * *

This value: *

    + *
  1. Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. *
  2. Is monotonically increasing. *
  3. May differ between workers due to network and other delays. *
  4. Will never be ahead of the global input watermark for this computation. But it @@ -95,7 +96,6 @@ public interface TimerInternals { * it is possible for an element to be considered locally on-time even though it is * globally late. */ - @Nullable Instant currentInputWatermarkTime(); /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java index 7d4b4f2abe57..2ddf5245b8f2 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java @@ -54,7 +54,6 @@ public interface Timers { @Nullable public abstract Instant currentSynchronizedProcessingTime(); - /** Returns the current event time or {@code null} if unknown. */ - @Nullable + /** Returns the current event time. */ public abstract Instant currentEventTime(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java index 64ff402a9aec..50e8b324eafb 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java @@ -209,7 +209,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timers.currentEventTime(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index dcfd03516b74..49ccaa3b250a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -172,6 +172,8 @@ public boolean shouldFire(W window, Timers timers, StateAccessor state) throw } public void onFire(W window, Timers timers, StateAccessor state) throws Exception { + // shouldFire should be true. + // However it is too expensive to assert. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); Trigger.TriggerContext context = contextFactory.base(window, timers, diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index d537ddb0e80d..7f814c457a61 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -241,7 +241,7 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) if (outputWM != null && elementHold.isBefore(outputWM)) { which = "too late to effect output watermark"; tooLate = true; - } else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM)) { + } else if (context.window().maxTimestamp().isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -287,10 +287,11 @@ private Instant addEndOfWindowHold(ReduceFn.Context context) { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); - if (inputWM != null && eowHold.isBefore(inputWM)) { + if (eowHold.isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -329,11 +330,12 @@ private Instant addGarbageCollectionHold(ReduceFn.Context context) { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM); - Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM), + Preconditions.checkState(!gcHold.isBefore(inputWM), "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM); context.state().access(EXTRA_HOLD_TAG).add(gcHold); return gcHold; @@ -368,6 +370,19 @@ public void onMerge(ReduceFn.OnMergeContext context) { addEndOfWindowOrGarbageCollectionHolds(context); } + /** + * Result of {@link #extractAndRelease}. + */ + public static class OldAndNewHolds { + public final Instant oldHold; + @Nullable public final Instant newHold; + + public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { + this.oldHold = oldHold; + this.newHold = newHold; + } + } + /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. @@ -377,7 +392,7 @@ public void onMerge(ReduceFn.OnMergeContext context) { * elements in the current pane. If there is no such value the timestamp is the end * of the window. */ - public ReadableState extractAndRelease( + public ReadableState extractAndRelease( final ReduceFn.Context context, final boolean isFinished) { WindowTracing.debug( "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -385,38 +400,38 @@ public ReadableState extractAndRelease( timerInternals.currentOutputWatermarkTime()); final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag); final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); - return new ReadableState() { + return new ReadableState() { @Override - public ReadableState readLater() { + public ReadableState readLater() { elementHoldState.readLater(); extraHoldState.readLater(); return this; } @Override - public Instant read() { + public OldAndNewHolds read() { // Read both the element and extra holds. Instant elementHold = elementHoldState.read(); Instant extraHold = extraHoldState.read(); - Instant hold; + Instant oldHold; // Find the minimum, accounting for null. if (elementHold == null) { - hold = extraHold; + oldHold = extraHold; } else if (extraHold == null) { - hold = elementHold; + oldHold = elementHold; } else if (elementHold.isBefore(extraHold)) { - hold = elementHold; + oldHold = elementHold; } else { - hold = extraHold; + oldHold = extraHold; } - if (hold == null || hold.isAfter(context.window().maxTimestamp())) { + if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { // If no hold (eg because all elements came in behind the output watermark), or // the hold was for garbage collection, take the end of window as the result. WindowTracing.debug( "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " + "for key:{}; window:{}", - hold, context.key(), context.window()); - hold = context.window().maxTimestamp(); + oldHold, context.key(), context.window()); + oldHold = context.window().maxTimestamp(); } WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window()); @@ -425,13 +440,14 @@ public Instant read() { elementHoldState.clear(); extraHoldState.clear(); + @Nullable Instant newHold = null; if (!isFinished) { // Only need to leave behind an end-of-window or garbage collection hold // if future elements will be processed. - addEndOfWindowOrGarbageCollectionHolds(context); + newHold = addEndOfWindowOrGarbageCollectionHolds(context); } - return hold; + return new OldAndNewHolds(oldHold, newHold); } }; } @@ -447,4 +463,12 @@ public void clearHolds(ReduceFn.Context context) { context.state().access(elementHoldTag).clear(); context.state().access(EXTRA_HOLD_TAG).clear(); } + + /** + * Return the current data hold, or null if none. Does not clear. For debugging only. + */ + @Nullable + public Instant getDataCurrent(ReduceFn.Context context) { + return context.state().access(elementHoldTag).read(); + } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java index 1daadc7e2b7f..cc609532ba76 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java @@ -1,3 +1,4 @@ + /* * Copyright (C) 2016 Google Inc. * diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java index d4620a7827c5..4aeaa0cb99ac 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java @@ -599,7 +599,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -666,9 +666,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -692,7 +691,7 @@ public void advanceInputWatermark( ReduceFnRunner runner, Instant newInputWatermark) throws Exception { Preconditions.checkNotNull(newInputWatermark); Preconditions.checkState( - inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + !newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -713,7 +712,6 @@ public void advanceInputWatermark( public void advanceOutputWatermark(Instant newOutputWatermark) { Preconditions.checkNotNull(newOutputWatermark); - Preconditions.checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java index 0c7183020291..f291438f8bda 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java @@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState; import com.google.cloud.dataflow.sdk.values.TimestampedValue; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -428,7 +429,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -471,9 +472,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -495,7 +495,7 @@ public String toString() { public void advanceInputWatermark(Instant newInputWatermark) throws Exception { checkNotNull(newInputWatermark); - checkState(inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + checkState(!newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -513,7 +513,6 @@ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception { checkNotNull(newOutputWatermark); - checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", @@ -577,7 +576,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); }