From cf3638e16f1e62f224f4bbcc5975de775688c229 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 29 Feb 2016 15:30:08 -0800 Subject: [PATCH 1/3] [BEAM-80] Fix the Unexpected StateTag error in convertToBagTagInternal It adds asBagTag() in KeyedCombiningValueWithContextStateTag, and allows convertToBagTagInternal accept it. --- .../dataflow/sdk/util/CombineFnUtil.java | 57 ++++++++++++++++ .../dataflow/sdk/util/state/StateTags.java | 24 +++++-- .../dataflow/sdk/util/state/StateTagTest.java | 65 +++++++++++++++++++ 3 files changed, 139 insertions(+), 7 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java index 6201e6e7bb..d9744800b9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java @@ -19,7 +19,10 @@ import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.CoderRegistry; +import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext; import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context; import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import com.google.cloud.dataflow.sdk.util.state.StateContext; @@ -46,6 +49,60 @@ public class CombineFnUtil { return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context); } + /** + * Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}. + */ + public static + CombineFnWithContext toFnWithContext( + GlobalCombineFn globalCombineFn) { + if (globalCombineFn instanceof CombineFnWithContext) { + @SuppressWarnings("unchecked") + CombineFnWithContext combineFnWithContext = + (CombineFnWithContext) globalCombineFn; + return combineFnWithContext; + } else { + @SuppressWarnings("unchecked") + final CombineFn combineFn = + (CombineFn) globalCombineFn; + return new CombineFnWithContext() { + @Override + public AccumT createAccumulator(Context c) { + return combineFn.createAccumulator(); + } + @Override + public AccumT addInput(AccumT accumulator, InputT input, Context c) { + return combineFn.addInput(accumulator, input); + } + @Override + public AccumT mergeAccumulators(Iterable accumulators, Context c) { + return combineFn.mergeAccumulators(accumulators); + } + @Override + public OutputT extractOutput(AccumT accumulator, Context c) { + return combineFn.extractOutput(accumulator); + } + @Override + public AccumT compact(AccumT accumulator, Context c) { + return combineFn.compact(accumulator); + } + @Override + public OutputT defaultValue() { + return combineFn.defaultValue(); + } + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) + throws CannotProvideCoderException { + return combineFn.getAccumulatorCoder(registry, inputCoder); + } + @Override + public Coder getDefaultOutputCoder( + CoderRegistry registry, Coder inputCoder) throws CannotProvideCoderException { + return combineFn.getDefaultOutputCoder(registry, inputCoder); + } + }; + } + } + private static class NonSerializableBoundedKeyedCombineFn extends KeyedCombineFn { private final KeyedCombineFnWithContext combineFn; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java index 0cbaa52369..ec9a78feab 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java @@ -181,15 +181,21 @@ public static StateTag makeSystemTagInterna public static StateTag> convertToBagTagInternal( StateTag> combiningTag) { - if (!(combiningTag instanceof KeyedCombiningValueStateTag)) { + if (combiningTag instanceof KeyedCombiningValueStateTag) { + // Checked above; conversion to a bag tag depends on the provided tag being one of those + // created via the factory methods in this class. + @SuppressWarnings("unchecked") + KeyedCombiningValueStateTag typedTag = + (KeyedCombiningValueStateTag) combiningTag; + return typedTag.asBagTag(); + } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) { + @SuppressWarnings("unchecked") + KeyedCombiningValueWithContextStateTag typedTag = + (KeyedCombiningValueWithContextStateTag) combiningTag; + return typedTag.asBagTag(); + } else { throw new IllegalArgumentException("Unexpected StateTag " + combiningTag); } - // Checked above; conversion to a bag tag depends on the provided tag being one of those - // created via the factory methods in this class. - @SuppressWarnings("unchecked") - KeyedCombiningValueStateTag typedTag = - (KeyedCombiningValueStateTag) combiningTag; - return typedTag.asBagTag(); } private static class StructuredId implements Serializable { @@ -413,6 +419,10 @@ public StateTag> asKind( return new KeyedCombiningValueWithContextStateTag<>( id.asKind(kind), accumCoder, combineFn); } + + private StateTag> asBagTag() { + return new BagStateTag(id, accumCoder); + } } /** diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java index 47f7224e90..b4ff36fae3 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java @@ -20,12 +20,15 @@ import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderRegistry; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.transforms.Combine.Holder; import com.google.cloud.dataflow.sdk.transforms.Max; import com.google.cloud.dataflow.sdk.transforms.Max.MaxIntegerFn; import com.google.cloud.dataflow.sdk.transforms.Min; import com.google.cloud.dataflow.sdk.transforms.Min.MinIntegerFn; import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; +import com.google.cloud.dataflow.sdk.util.CombineFnUtil; import org.junit.Test; import org.junit.runner.RunWith; @@ -80,6 +83,7 @@ public void testWatermarkBagEquality() { assertEquals(bar, bar2); } + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testCombiningValueEquality() { MaxIntegerFn maxFn = new Max.MaxIntegerFn(); @@ -96,13 +100,74 @@ public void testCombiningValueEquality() { // Same name, coder and combineFn assertEquals(fooCoder1Max1, fooCoder1Max2); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2)); + + // Different combineFn, but we treat them as equal since we only serialize the bits. + assertEquals(fooCoder1Max1, fooCoder1Min); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Min)); + + // Different input coder coder. + assertNotEquals(fooCoder1Max1, fooCoder2Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder2Max)); + + // These StateTags have different IDs. + assertNotEquals(fooCoder1Max1, barCoder1Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) barCoder1Max)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testCombiningValueWithContextEquality() { + CoderRegistry registry = new CoderRegistry(); + registry.registerStandardCoders(); + + MaxIntegerFn maxFn = new Max.MaxIntegerFn(); + MinIntegerFn minFn = new Min.MinIntegerFn(); + + Coder> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of()); + Coder> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of()); + + StateTag fooCoder1Max1 = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag fooCoder1Max2 = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag fooCoder1Min = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn()); + + StateTag fooCoder2Max = StateTags.keyedCombiningValueWithContext( + "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag barCoder1Max = StateTags.keyedCombiningValueWithContext( + "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + + // Same name, coder and combineFn + assertEquals(fooCoder1Max1, fooCoder1Max2); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2)); // Different combineFn, but we treat them as equal since we only serialize the bits. assertEquals(fooCoder1Max1, fooCoder1Min); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Min)); // Different input coder coder. assertNotEquals(fooCoder1Max1, fooCoder2Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder2Max)); // These StateTags have different IDs. assertNotEquals(fooCoder1Max1, barCoder1Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) barCoder1Max)); } } From ff6617f6d22c487991d7edbbf4a4b607b2b2f6ca Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 2 Mar 2016 20:45:59 -0800 Subject: [PATCH 2/3] Basic non-null checks --- .../dataflow/sdk/util/ReduceFnRunner.java | 51 ++++++++++++++++- .../dataflow/sdk/util/TriggerRunner.java | 2 + .../dataflow/sdk/util/WatermarkHold.java | 55 ++++++++++++++----- 3 files changed, 92 insertions(+), 16 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 2e2d1f6c7d..f1d45828f9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -499,6 +499,11 @@ private Collection processElement(WindowedValue value) throws Excepti directContext.timestamp(), directContext.timers(), directContext.state()); + + // At this point, if triggerRunner.shouldFire before the processValue then + // triggerRunner.shouldFire after the processValue. In other words adding values + // cannot take a trigger state from firing to non-firing. + // (We don't actually assert this since it is too slow.) } return windows; @@ -568,6 +573,11 @@ public void onTimer(TimerData timer) throws Exception { boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. + // We could assert this but it is very expensive. + // Since we are processing an on-time firing we should schedule the garbage collection // timer. (If getAllowedLateness is zero then the timer event will be considered a // cleanup event and handled by the above). @@ -715,8 +725,11 @@ private void onTrigger( ReduceFn.Context renamedContext, boolean isFinished) throws Exception { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + // Prefetch necessary states - ReadableState outputTimestampFuture = + ReadableState outputTimestampFuture = watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); ReadableState paneFuture = paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); @@ -729,7 +742,41 @@ private void onTrigger( // Calculate the pane info. final PaneInfo pane = paneFuture.read(); // Extract the window hold, and as a side effect clear it. - final Instant outputTimestamp = outputTimestampFuture.read(); + + WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + final Instant outputTimestamp = pair.oldHold; + @Nullable Instant newHold = pair.newHold; + + if (newHold != null && inputWM != null) { + // We can't be finished yet. + Preconditions.checkState( + !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); + // The hold cannot be behind the input watermark. + Preconditions.checkState( + !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); + if (newHold.isAfter(directContext.window().maxTimestamp())) { + // The hold must be for garbage collection, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual( + directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); + } else { + // The hold must be for the end-of-window, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual(directContext.window().maxTimestamp()), + "new hold %s should be at end of window %s", + newHold, + directContext.window()); + Preconditions.checkState( + !isEndOfWindow, + "new hold at %s for %s but this is the watermark trigger", + newHold, + directContext.window()); + } + } // Only emit a pane if it has data or empty panes are observable. if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index dcfd03516b..8fc498112e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -172,6 +172,8 @@ public boolean shouldFire(W window, Timers timers, StateAccessor state) throw } public void onFire(W window, Timers timers, StateAccessor state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); Trigger.TriggerContext context = contextFactory.base(window, timers, diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index d537ddb0e8..31e36c5ef3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -228,6 +228,7 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); // Only add the hold if we can be sure: // - the backend will be able to respect it @@ -287,6 +288,8 @@ private Instant addEndOfWindowHold(ReduceFn.Context context) { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); @@ -329,6 +332,8 @@ private Instant addGarbageCollectionHold(ReduceFn.Context context) { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -368,6 +373,19 @@ public void onMerge(ReduceFn.OnMergeContext context) { addEndOfWindowOrGarbageCollectionHolds(context); } + /** + * Result of {@link #extractAndRelease}. + */ + public static class OldAndNewHolds { + public final Instant oldHold; + @Nullable public final Instant newHold; + + public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { + this.oldHold = oldHold; + this.newHold = newHold; + } + } + /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. @@ -377,7 +395,7 @@ public void onMerge(ReduceFn.OnMergeContext context) { * elements in the current pane. If there is no such value the timestamp is the end * of the window. */ - public ReadableState extractAndRelease( + public ReadableState extractAndRelease( final ReduceFn.Context context, final boolean isFinished) { WindowTracing.debug( "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -385,38 +403,38 @@ public ReadableState extractAndRelease( timerInternals.currentOutputWatermarkTime()); final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag); final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); - return new ReadableState() { + return new ReadableState() { @Override - public ReadableState readLater() { + public ReadableState readLater() { elementHoldState.readLater(); extraHoldState.readLater(); return this; } @Override - public Instant read() { + public OldAndNewHolds read() { // Read both the element and extra holds. Instant elementHold = elementHoldState.read(); Instant extraHold = extraHoldState.read(); - Instant hold; + Instant oldHold; // Find the minimum, accounting for null. if (elementHold == null) { - hold = extraHold; + oldHold = extraHold; } else if (extraHold == null) { - hold = elementHold; + oldHold = elementHold; } else if (elementHold.isBefore(extraHold)) { - hold = elementHold; + oldHold = elementHold; } else { - hold = extraHold; + oldHold = extraHold; } - if (hold == null || hold.isAfter(context.window().maxTimestamp())) { + if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { // If no hold (eg because all elements came in behind the output watermark), or // the hold was for garbage collection, take the end of window as the result. WindowTracing.debug( "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " + "for key:{}; window:{}", - hold, context.key(), context.window()); - hold = context.window().maxTimestamp(); + oldHold, context.key(), context.window()); + oldHold = context.window().maxTimestamp(); } WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window()); @@ -425,13 +443,14 @@ public Instant read() { elementHoldState.clear(); extraHoldState.clear(); + @Nullable Instant newHold = null; if (!isFinished) { // Only need to leave behind an end-of-window or garbage collection hold // if future elements will be processed. - addEndOfWindowOrGarbageCollectionHolds(context); + newHold = addEndOfWindowOrGarbageCollectionHolds(context); } - return hold; + return new OldAndNewHolds(oldHold, newHold); } }; } @@ -447,4 +466,12 @@ public void clearHolds(ReduceFn.Context context) { context.state().access(elementHoldTag).clear(); context.state().access(EXTRA_HOLD_TAG).clear(); } + + /** + * Return the current data hold, or null if none. Does not clear. For debugging only. + */ + @Nullable + public Instant getDataCurrent(ReduceFn.Context context) { + return context.state().access(elementHoldTag).read(); + } } From c2e002a73acf3b692c6cfcae8bedccdf16677a74 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Fri, 4 Mar 2016 15:16:33 -0800 Subject: [PATCH 3/3] Input watermarks can never be null. --- .../inprocess/InMemoryWatermarkManager.java | 3 ++- .../inprocess/InProcessTimerInternals.java | 1 - .../sdk/util/LateDataDroppingDoFnRunner.java | 3 +-- .../dataflow/sdk/util/PaneInfoTracker.java | 2 +- .../sdk/util/ReduceFnContextFactory.java | 1 - .../dataflow/sdk/util/ReduceFnRunner.java | 18 +++++++++--------- .../dataflow/sdk/util/TimerInternals.java | 4 ++-- .../google/cloud/dataflow/sdk/util/Timers.java | 3 +-- .../sdk/util/TriggerContextFactory.java | 1 - .../cloud/dataflow/sdk/util/WatermarkHold.java | 9 +++------ .../dataflow/sdk/io/CountingInputTest.java | 1 + .../dataflow/sdk/util/ReduceFnTester.java | 8 +++----- .../cloud/dataflow/sdk/util/TriggerTester.java | 10 ++++------ 13 files changed, 27 insertions(+), 37 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index a9a62a6aa3..c4d67db121 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -994,7 +995,7 @@ private TransformWatermarks( * Returns the input watermark of the {@link AppliedPTransform}. */ public Instant getInputWatermark() { - return inputWatermark.get(); + return Preconditions.checkNotNull(inputWatermark.get()); } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java index 06ba7b82f4..1d075c54c2 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java @@ -70,7 +70,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { return watermarks.getInputWatermark(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java index 31927ab882..3dfa06474e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java @@ -138,8 +138,7 @@ public boolean apply(WindowedValue input) { /** Is {@code window} expired w.r.t. the garbage collection watermark? */ private boolean canDropDueToExpiredWindow(BoundedWindow window) { Instant inputWM = timerInternals.currentInputWatermarkTime(); - return inputWM != null - && window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java index a7818a30a0..9fa36b0eb7 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java @@ -103,7 +103,7 @@ private PaneInfo describePane( boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY; // True is the input watermark hasn't passed the window's max timestamp. - boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp); + boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp); Timing timing; if (isLateForOutput || !onlyEarlyPanesSoFar) { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java index bdbaf1098e..7649d520bb 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java @@ -146,7 +146,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index f1d45828f9..560d8ecd44 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -537,6 +537,10 @@ public void onTimer(TimerData timer) throws Exception { "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } + // If this is an end-of-window timer then, we need to set a GC timer + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + // If this is a garbage collection timer then we should trigger and garbage collect the window. Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); boolean isGarbageCollection = @@ -553,7 +557,7 @@ public void onTimer(TimerData timer) throws Exception { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. - onTrigger(directContext, renamedContext, true/* isFinished */); + onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); } // Cleanup flavor B: Clear all the remaining state for this window since we'll never @@ -569,9 +573,6 @@ public void onTimer(TimerData timer) throws Exception { emitIfAppropriate(directContext, renamedContext); } - // If this is an end-of-window timer then, we need to set a GC timer - boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { // If the window strategy trigger includes a watermark trigger then at this point // there should be no data holds, either because we'd already cleared them on an @@ -676,7 +677,7 @@ private void emitIfAppropriate(ReduceFn.Context directCon // Run onTrigger to produce the actual pane contents. // As a side effect it will clear all element holds, but not necessarily any // end-of-window or garbage collection holds. - onTrigger(directContext, renamedContext, isFinished); + onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/); // Now that we've triggered, the pane is empty. nonEmptyPanes.clearPane(renamedContext.state()); @@ -723,10 +724,9 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing private void onTrigger( final ReduceFn.Context directContext, ReduceFn.Context renamedContext, - boolean isFinished) + boolean isFinished, boolean isEndOfWindow) throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); // Prefetch necessary states ReadableState outputTimestampFuture = @@ -747,7 +747,7 @@ private void onTrigger( final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; - if (newHold != null && inputWM != null) { + if (newHold != null) { // We can't be finished yet. Preconditions.checkState( !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); @@ -825,7 +825,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer( Instant endOfWindow = directContext.window().maxTimestamp(); Instant fireTime; String which; - if (inputWM != null && endOfWindow.isBefore(inputWM)) { + if (endOfWindow.isBefore(inputWM)) { fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness()); which = "garbage collection"; } else { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java index c823ed39b1..b26e6e87b0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java @@ -79,10 +79,11 @@ public interface TimerInternals { /** * Return the current, local input watermark timestamp for this computation - * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown. + * in the {@link TimeDomain#EVENT_TIME} time domain. * *

This value: *

    + *
  1. Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. *
  2. Is monotonically increasing. *
  3. May differ between workers due to network and other delays. *
  4. Will never be ahead of the global input watermark for this computation. But it @@ -95,7 +96,6 @@ public interface TimerInternals { * it is possible for an element to be considered locally on-time even though it is * globally late. */ - @Nullable Instant currentInputWatermarkTime(); /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java index 7d4b4f2abe..2ddf5245b8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java @@ -54,7 +54,6 @@ public interface Timers { @Nullable public abstract Instant currentSynchronizedProcessingTime(); - /** Returns the current event time or {@code null} if unknown. */ - @Nullable + /** Returns the current event time. */ public abstract Instant currentEventTime(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java index 64ff402a9a..50e8b324ea 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java @@ -209,7 +209,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timers.currentEventTime(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index 31e36c5ef3..7f814c457a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -228,7 +228,6 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); // Only add the hold if we can be sure: // - the backend will be able to respect it @@ -242,7 +241,7 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) if (outputWM != null && elementHold.isBefore(outputWM)) { which = "too late to effect output watermark"; tooLate = true; - } else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM)) { + } else if (context.window().maxTimestamp().isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -288,12 +287,11 @@ private Instant addEndOfWindowHold(ReduceFn.Context context) { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); - if (inputWM != null && eowHold.isBefore(inputWM)) { + if (eowHold.isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -332,13 +330,12 @@ private Instant addGarbageCollectionHold(ReduceFn.Context context) { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM); - Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM), + Preconditions.checkState(!gcHold.isBefore(inputWM), "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM); context.state().access(EXTRA_HOLD_TAG).add(gcHold); return gcHold; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java index 1daadc7e2b..cc609532ba 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java @@ -1,3 +1,4 @@ + /* * Copyright (C) 2016 Google Inc. * diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java index d4620a7827..4aeaa0cb99 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java @@ -599,7 +599,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -666,9 +666,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -692,7 +691,7 @@ public void advanceInputWatermark( ReduceFnRunner runner, Instant newInputWatermark) throws Exception { Preconditions.checkNotNull(newInputWatermark); Preconditions.checkState( - inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + !newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -713,7 +712,6 @@ public void advanceInputWatermark( public void advanceOutputWatermark(Instant newOutputWatermark) { Preconditions.checkNotNull(newOutputWatermark); - Preconditions.checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java index 0c71830202..f291438f8b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java @@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState; import com.google.cloud.dataflow.sdk.values.TimestampedValue; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -428,7 +429,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -471,9 +472,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -495,7 +495,7 @@ public String toString() { public void advanceInputWatermark(Instant newInputWatermark) throws Exception { checkNotNull(newInputWatermark); - checkState(inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + checkState(!newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -513,7 +513,6 @@ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception { checkNotNull(newOutputWatermark); - checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", @@ -577,7 +576,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); }