From cf3638e16f1e62f224f4bbcc5975de775688c229 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 29 Feb 2016 15:30:08 -0800 Subject: [PATCH 1/4] [BEAM-80] Fix the Unexpected StateTag error in convertToBagTagInternal It adds asBagTag() in KeyedCombiningValueWithContextStateTag, and allows convertToBagTagInternal accept it. --- .../dataflow/sdk/util/CombineFnUtil.java | 57 ++++++++++++++++ .../dataflow/sdk/util/state/StateTags.java | 24 +++++-- .../dataflow/sdk/util/state/StateTagTest.java | 65 +++++++++++++++++++ 3 files changed, 139 insertions(+), 7 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java index 6201e6e7bb..d9744800b9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/CombineFnUtil.java @@ -19,7 +19,10 @@ import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.CoderRegistry; +import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext; import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context; import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import com.google.cloud.dataflow.sdk.util.state.StateContext; @@ -46,6 +49,60 @@ public class CombineFnUtil { return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context); } + /** + * Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}. + */ + public static + CombineFnWithContext toFnWithContext( + GlobalCombineFn globalCombineFn) { + if (globalCombineFn instanceof CombineFnWithContext) { + @SuppressWarnings("unchecked") + CombineFnWithContext combineFnWithContext = + (CombineFnWithContext) globalCombineFn; + return combineFnWithContext; + } else { + @SuppressWarnings("unchecked") + final CombineFn combineFn = + (CombineFn) globalCombineFn; + return new CombineFnWithContext() { + @Override + public AccumT createAccumulator(Context c) { + return combineFn.createAccumulator(); + } + @Override + public AccumT addInput(AccumT accumulator, InputT input, Context c) { + return combineFn.addInput(accumulator, input); + } + @Override + public AccumT mergeAccumulators(Iterable accumulators, Context c) { + return combineFn.mergeAccumulators(accumulators); + } + @Override + public OutputT extractOutput(AccumT accumulator, Context c) { + return combineFn.extractOutput(accumulator); + } + @Override + public AccumT compact(AccumT accumulator, Context c) { + return combineFn.compact(accumulator); + } + @Override + public OutputT defaultValue() { + return combineFn.defaultValue(); + } + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) + throws CannotProvideCoderException { + return combineFn.getAccumulatorCoder(registry, inputCoder); + } + @Override + public Coder getDefaultOutputCoder( + CoderRegistry registry, Coder inputCoder) throws CannotProvideCoderException { + return combineFn.getDefaultOutputCoder(registry, inputCoder); + } + }; + } + } + private static class NonSerializableBoundedKeyedCombineFn extends KeyedCombineFn { private final KeyedCombineFnWithContext combineFn; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java index 0cbaa52369..ec9a78feab 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateTags.java @@ -181,15 +181,21 @@ public static StateTag makeSystemTagInterna public static StateTag> convertToBagTagInternal( StateTag> combiningTag) { - if (!(combiningTag instanceof KeyedCombiningValueStateTag)) { + if (combiningTag instanceof KeyedCombiningValueStateTag) { + // Checked above; conversion to a bag tag depends on the provided tag being one of those + // created via the factory methods in this class. + @SuppressWarnings("unchecked") + KeyedCombiningValueStateTag typedTag = + (KeyedCombiningValueStateTag) combiningTag; + return typedTag.asBagTag(); + } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) { + @SuppressWarnings("unchecked") + KeyedCombiningValueWithContextStateTag typedTag = + (KeyedCombiningValueWithContextStateTag) combiningTag; + return typedTag.asBagTag(); + } else { throw new IllegalArgumentException("Unexpected StateTag " + combiningTag); } - // Checked above; conversion to a bag tag depends on the provided tag being one of those - // created via the factory methods in this class. - @SuppressWarnings("unchecked") - KeyedCombiningValueStateTag typedTag = - (KeyedCombiningValueStateTag) combiningTag; - return typedTag.asBagTag(); } private static class StructuredId implements Serializable { @@ -413,6 +419,10 @@ public StateTag> asKind( return new KeyedCombiningValueWithContextStateTag<>( id.asKind(kind), accumCoder, combineFn); } + + private StateTag> asBagTag() { + return new BagStateTag(id, accumCoder); + } } /** diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java index 47f7224e90..b4ff36fae3 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/state/StateTagTest.java @@ -20,12 +20,15 @@ import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderRegistry; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.transforms.Combine.Holder; import com.google.cloud.dataflow.sdk.transforms.Max; import com.google.cloud.dataflow.sdk.transforms.Max.MaxIntegerFn; import com.google.cloud.dataflow.sdk.transforms.Min; import com.google.cloud.dataflow.sdk.transforms.Min.MinIntegerFn; import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; +import com.google.cloud.dataflow.sdk.util.CombineFnUtil; import org.junit.Test; import org.junit.runner.RunWith; @@ -80,6 +83,7 @@ public void testWatermarkBagEquality() { assertEquals(bar, bar2); } + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testCombiningValueEquality() { MaxIntegerFn maxFn = new Max.MaxIntegerFn(); @@ -96,13 +100,74 @@ public void testCombiningValueEquality() { // Same name, coder and combineFn assertEquals(fooCoder1Max1, fooCoder1Max2); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2)); + + // Different combineFn, but we treat them as equal since we only serialize the bits. + assertEquals(fooCoder1Max1, fooCoder1Min); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Min)); + + // Different input coder coder. + assertNotEquals(fooCoder1Max1, fooCoder2Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder2Max)); + + // These StateTags have different IDs. + assertNotEquals(fooCoder1Max1, barCoder1Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) barCoder1Max)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testCombiningValueWithContextEquality() { + CoderRegistry registry = new CoderRegistry(); + registry.registerStandardCoders(); + + MaxIntegerFn maxFn = new Max.MaxIntegerFn(); + MinIntegerFn minFn = new Min.MinIntegerFn(); + + Coder> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of()); + Coder> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of()); + + StateTag fooCoder1Max1 = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag fooCoder1Max2 = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag fooCoder1Min = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn()); + + StateTag fooCoder2Max = StateTags.keyedCombiningValueWithContext( + "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag barCoder1Max = StateTags.keyedCombiningValueWithContext( + "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + + // Same name, coder and combineFn + assertEquals(fooCoder1Max1, fooCoder1Max2); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2)); // Different combineFn, but we treat them as equal since we only serialize the bits. assertEquals(fooCoder1Max1, fooCoder1Min); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Min)); // Different input coder coder. assertNotEquals(fooCoder1Max1, fooCoder2Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder2Max)); // These StateTags have different IDs. assertNotEquals(fooCoder1Max1, barCoder1Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) barCoder1Max)); } } From ff6617f6d22c487991d7edbbf4a4b607b2b2f6ca Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 2 Mar 2016 20:45:59 -0800 Subject: [PATCH 2/4] Basic non-null checks --- .../dataflow/sdk/util/ReduceFnRunner.java | 51 ++++++++++++++++- .../dataflow/sdk/util/TriggerRunner.java | 2 + .../dataflow/sdk/util/WatermarkHold.java | 55 ++++++++++++++----- 3 files changed, 92 insertions(+), 16 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 2e2d1f6c7d..f1d45828f9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -499,6 +499,11 @@ private Collection processElement(WindowedValue value) throws Excepti directContext.timestamp(), directContext.timers(), directContext.state()); + + // At this point, if triggerRunner.shouldFire before the processValue then + // triggerRunner.shouldFire after the processValue. In other words adding values + // cannot take a trigger state from firing to non-firing. + // (We don't actually assert this since it is too slow.) } return windows; @@ -568,6 +573,11 @@ public void onTimer(TimerData timer) throws Exception { boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. + // We could assert this but it is very expensive. + // Since we are processing an on-time firing we should schedule the garbage collection // timer. (If getAllowedLateness is zero then the timer event will be considered a // cleanup event and handled by the above). @@ -715,8 +725,11 @@ private void onTrigger( ReduceFn.Context renamedContext, boolean isFinished) throws Exception { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + // Prefetch necessary states - ReadableState outputTimestampFuture = + ReadableState outputTimestampFuture = watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); ReadableState paneFuture = paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); @@ -729,7 +742,41 @@ private void onTrigger( // Calculate the pane info. final PaneInfo pane = paneFuture.read(); // Extract the window hold, and as a side effect clear it. - final Instant outputTimestamp = outputTimestampFuture.read(); + + WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + final Instant outputTimestamp = pair.oldHold; + @Nullable Instant newHold = pair.newHold; + + if (newHold != null && inputWM != null) { + // We can't be finished yet. + Preconditions.checkState( + !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); + // The hold cannot be behind the input watermark. + Preconditions.checkState( + !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); + if (newHold.isAfter(directContext.window().maxTimestamp())) { + // The hold must be for garbage collection, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual( + directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); + } else { + // The hold must be for the end-of-window, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual(directContext.window().maxTimestamp()), + "new hold %s should be at end of window %s", + newHold, + directContext.window()); + Preconditions.checkState( + !isEndOfWindow, + "new hold at %s for %s but this is the watermark trigger", + newHold, + directContext.window()); + } + } // Only emit a pane if it has data or empty panes are observable. if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index dcfd03516b..8fc498112e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -172,6 +172,8 @@ public boolean shouldFire(W window, Timers timers, StateAccessor state) throw } public void onFire(W window, Timers timers, StateAccessor state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); Trigger.TriggerContext context = contextFactory.base(window, timers, diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index d537ddb0e8..31e36c5ef3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -228,6 +228,7 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); // Only add the hold if we can be sure: // - the backend will be able to respect it @@ -287,6 +288,8 @@ private Instant addEndOfWindowHold(ReduceFn.Context context) { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); @@ -329,6 +332,8 @@ private Instant addGarbageCollectionHold(ReduceFn.Context context) { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -368,6 +373,19 @@ public void onMerge(ReduceFn.OnMergeContext context) { addEndOfWindowOrGarbageCollectionHolds(context); } + /** + * Result of {@link #extractAndRelease}. + */ + public static class OldAndNewHolds { + public final Instant oldHold; + @Nullable public final Instant newHold; + + public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { + this.oldHold = oldHold; + this.newHold = newHold; + } + } + /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. @@ -377,7 +395,7 @@ public void onMerge(ReduceFn.OnMergeContext context) { * elements in the current pane. If there is no such value the timestamp is the end * of the window. */ - public ReadableState extractAndRelease( + public ReadableState extractAndRelease( final ReduceFn.Context context, final boolean isFinished) { WindowTracing.debug( "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -385,38 +403,38 @@ public ReadableState extractAndRelease( timerInternals.currentOutputWatermarkTime()); final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag); final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); - return new ReadableState() { + return new ReadableState() { @Override - public ReadableState readLater() { + public ReadableState readLater() { elementHoldState.readLater(); extraHoldState.readLater(); return this; } @Override - public Instant read() { + public OldAndNewHolds read() { // Read both the element and extra holds. Instant elementHold = elementHoldState.read(); Instant extraHold = extraHoldState.read(); - Instant hold; + Instant oldHold; // Find the minimum, accounting for null. if (elementHold == null) { - hold = extraHold; + oldHold = extraHold; } else if (extraHold == null) { - hold = elementHold; + oldHold = elementHold; } else if (elementHold.isBefore(extraHold)) { - hold = elementHold; + oldHold = elementHold; } else { - hold = extraHold; + oldHold = extraHold; } - if (hold == null || hold.isAfter(context.window().maxTimestamp())) { + if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { // If no hold (eg because all elements came in behind the output watermark), or // the hold was for garbage collection, take the end of window as the result. WindowTracing.debug( "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " + "for key:{}; window:{}", - hold, context.key(), context.window()); - hold = context.window().maxTimestamp(); + oldHold, context.key(), context.window()); + oldHold = context.window().maxTimestamp(); } WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window()); @@ -425,13 +443,14 @@ public Instant read() { elementHoldState.clear(); extraHoldState.clear(); + @Nullable Instant newHold = null; if (!isFinished) { // Only need to leave behind an end-of-window or garbage collection hold // if future elements will be processed. - addEndOfWindowOrGarbageCollectionHolds(context); + newHold = addEndOfWindowOrGarbageCollectionHolds(context); } - return hold; + return new OldAndNewHolds(oldHold, newHold); } }; } @@ -447,4 +466,12 @@ public void clearHolds(ReduceFn.Context context) { context.state().access(elementHoldTag).clear(); context.state().access(EXTRA_HOLD_TAG).clear(); } + + /** + * Return the current data hold, or null if none. Does not clear. For debugging only. + */ + @Nullable + public Instant getDataCurrent(ReduceFn.Context context) { + return context.state().access(elementHoldTag).read(); + } } From c2e002a73acf3b692c6cfcae8bedccdf16677a74 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Fri, 4 Mar 2016 15:16:33 -0800 Subject: [PATCH 3/4] Input watermarks can never be null. --- .../inprocess/InMemoryWatermarkManager.java | 3 ++- .../inprocess/InProcessTimerInternals.java | 1 - .../sdk/util/LateDataDroppingDoFnRunner.java | 3 +-- .../dataflow/sdk/util/PaneInfoTracker.java | 2 +- .../sdk/util/ReduceFnContextFactory.java | 1 - .../dataflow/sdk/util/ReduceFnRunner.java | 18 +++++++++--------- .../dataflow/sdk/util/TimerInternals.java | 4 ++-- .../google/cloud/dataflow/sdk/util/Timers.java | 3 +-- .../sdk/util/TriggerContextFactory.java | 1 - .../cloud/dataflow/sdk/util/WatermarkHold.java | 9 +++------ .../dataflow/sdk/io/CountingInputTest.java | 1 + .../dataflow/sdk/util/ReduceFnTester.java | 8 +++----- .../cloud/dataflow/sdk/util/TriggerTester.java | 10 ++++------ 13 files changed, 27 insertions(+), 37 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index a9a62a6aa3..c4d67db121 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -994,7 +995,7 @@ private TransformWatermarks( * Returns the input watermark of the {@link AppliedPTransform}. */ public Instant getInputWatermark() { - return inputWatermark.get(); + return Preconditions.checkNotNull(inputWatermark.get()); } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java index 06ba7b82f4..1d075c54c2 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java @@ -70,7 +70,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { return watermarks.getInputWatermark(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java index 31927ab882..3dfa06474e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java @@ -138,8 +138,7 @@ public boolean apply(WindowedValue input) { /** Is {@code window} expired w.r.t. the garbage collection watermark? */ private boolean canDropDueToExpiredWindow(BoundedWindow window) { Instant inputWM = timerInternals.currentInputWatermarkTime(); - return inputWM != null - && window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java index a7818a30a0..9fa36b0eb7 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java @@ -103,7 +103,7 @@ private PaneInfo describePane( boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY; // True is the input watermark hasn't passed the window's max timestamp. - boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp); + boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp); Timing timing; if (isLateForOutput || !onlyEarlyPanesSoFar) { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java index bdbaf1098e..7649d520bb 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java @@ -146,7 +146,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index f1d45828f9..560d8ecd44 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -537,6 +537,10 @@ public void onTimer(TimerData timer) throws Exception { "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } + // If this is an end-of-window timer then, we need to set a GC timer + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + // If this is a garbage collection timer then we should trigger and garbage collect the window. Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); boolean isGarbageCollection = @@ -553,7 +557,7 @@ public void onTimer(TimerData timer) throws Exception { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. - onTrigger(directContext, renamedContext, true/* isFinished */); + onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); } // Cleanup flavor B: Clear all the remaining state for this window since we'll never @@ -569,9 +573,6 @@ public void onTimer(TimerData timer) throws Exception { emitIfAppropriate(directContext, renamedContext); } - // If this is an end-of-window timer then, we need to set a GC timer - boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { // If the window strategy trigger includes a watermark trigger then at this point // there should be no data holds, either because we'd already cleared them on an @@ -676,7 +677,7 @@ private void emitIfAppropriate(ReduceFn.Context directCon // Run onTrigger to produce the actual pane contents. // As a side effect it will clear all element holds, but not necessarily any // end-of-window or garbage collection holds. - onTrigger(directContext, renamedContext, isFinished); + onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/); // Now that we've triggered, the pane is empty. nonEmptyPanes.clearPane(renamedContext.state()); @@ -723,10 +724,9 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing private void onTrigger( final ReduceFn.Context directContext, ReduceFn.Context renamedContext, - boolean isFinished) + boolean isFinished, boolean isEndOfWindow) throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); // Prefetch necessary states ReadableState outputTimestampFuture = @@ -747,7 +747,7 @@ private void onTrigger( final Instant outputTimestamp = pair.oldHold; @Nullable Instant newHold = pair.newHold; - if (newHold != null && inputWM != null) { + if (newHold != null) { // We can't be finished yet. Preconditions.checkState( !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); @@ -825,7 +825,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer( Instant endOfWindow = directContext.window().maxTimestamp(); Instant fireTime; String which; - if (inputWM != null && endOfWindow.isBefore(inputWM)) { + if (endOfWindow.isBefore(inputWM)) { fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness()); which = "garbage collection"; } else { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java index c823ed39b1..b26e6e87b0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java @@ -79,10 +79,11 @@ public interface TimerInternals { /** * Return the current, local input watermark timestamp for this computation - * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown. + * in the {@link TimeDomain#EVENT_TIME} time domain. * *

This value: *

    + *
  1. Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. *
  2. Is monotonically increasing. *
  3. May differ between workers due to network and other delays. *
  4. Will never be ahead of the global input watermark for this computation. But it @@ -95,7 +96,6 @@ public interface TimerInternals { * it is possible for an element to be considered locally on-time even though it is * globally late. */ - @Nullable Instant currentInputWatermarkTime(); /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java index 7d4b4f2abe..2ddf5245b8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java @@ -54,7 +54,6 @@ public interface Timers { @Nullable public abstract Instant currentSynchronizedProcessingTime(); - /** Returns the current event time or {@code null} if unknown. */ - @Nullable + /** Returns the current event time. */ public abstract Instant currentEventTime(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java index 64ff402a9a..50e8b324ea 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java @@ -209,7 +209,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timers.currentEventTime(); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index 31e36c5ef3..7f814c457a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -228,7 +228,6 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); // Only add the hold if we can be sure: // - the backend will be able to respect it @@ -242,7 +241,7 @@ private Instant addElementHold(ReduceFn.ProcessValueContext context) if (outputWM != null && elementHold.isBefore(outputWM)) { which = "too late to effect output watermark"; tooLate = true; - } else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM)) { + } else if (context.window().maxTimestamp().isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -288,12 +287,11 @@ private Instant addEndOfWindowHold(ReduceFn.Context context) { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); - if (inputWM != null && eowHold.isBefore(inputWM)) { + if (eowHold.isBefore(inputWM)) { which = "too late for end-of-window timer"; tooLate = true; } else { @@ -332,13 +330,12 @@ private Instant addGarbageCollectionHold(ReduceFn.Context context) { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); - Preconditions.checkNotNull(inputWM); WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM); - Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM), + Preconditions.checkState(!gcHold.isBefore(inputWM), "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM); context.state().access(EXTRA_HOLD_TAG).add(gcHold); return gcHold; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java index 1daadc7e2b..cc609532ba 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java @@ -1,3 +1,4 @@ + /* * Copyright (C) 2016 Google Inc. * diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java index d4620a7827..4aeaa0cb99 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java @@ -599,7 +599,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -666,9 +666,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -692,7 +691,7 @@ public void advanceInputWatermark( ReduceFnRunner runner, Instant newInputWatermark) throws Exception { Preconditions.checkNotNull(newInputWatermark); Preconditions.checkState( - inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + !newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -713,7 +712,6 @@ public void advanceInputWatermark( public void advanceOutputWatermark(Instant newOutputWatermark) { Preconditions.checkNotNull(newOutputWatermark); - Preconditions.checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java index 0c71830202..f291438f8b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java @@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState; import com.google.cloud.dataflow.sdk.values.TimestampedValue; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -428,7 +429,7 @@ private class TestTimerInternals implements TimerInternals { /** Current input watermark. */ @Nullable - private Instant inputWatermarkTime = null; + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Current output watermark. */ @Nullable @@ -471,9 +472,8 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentInputWatermarkTime() { - return inputWatermarkTime; + return Preconditions.checkNotNull(inputWatermarkTime); } @Override @@ -495,7 +495,7 @@ public String toString() { public void advanceInputWatermark(Instant newInputWatermark) throws Exception { checkNotNull(newInputWatermark); - checkState(inputWatermarkTime == null || !newInputWatermark.isBefore(inputWatermarkTime), + checkState(!newInputWatermark.isBefore(inputWatermarkTime), "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, newInputWatermark); WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", @@ -513,7 +513,6 @@ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception { checkNotNull(newOutputWatermark); - checkNotNull(inputWatermarkTime); if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", @@ -577,7 +576,6 @@ public Instant currentSynchronizedProcessingTime() { } @Override - @Nullable public Instant currentEventTime() { return timerInternals.currentInputWatermarkTime(); } From d665051a0c7e491694d8c8a59e27191a258904b0 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 22 Mar 2016 10:43:56 -0700 Subject: [PATCH 4/4] Integrate DisplayData into DataflowPipelineRunner --- .../runners/DataflowPipelineTranslator.java | 12 +++ .../sdk/transforms/display/DisplayData.java | 15 ++- .../dataflow/sdk/util/PropertyNames.java | 1 + .../DataflowPipelineTranslatorTest.java | 98 ++++++++++++++++++- .../transforms/display/DisplayDataTest.java | 2 +- 5 files changed, 122 insertions(+), 6 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index 0feae957f8..155c454b38 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -59,6 +59,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.util.AppliedCombineFn; @@ -79,6 +80,7 @@ import com.google.cloud.dataflow.sdk.values.TypedPValue; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -548,6 +550,7 @@ public void addStep(PTransform transform, String type) { currentStep.setKind(type); steps.add(currentStep); addInput(PropertyNames.USER_NAME, getFullName(transform)); + addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform)); } @Override @@ -725,6 +728,15 @@ private void addOutput(String name, PValue value, Coder valueCoder) { outputInfoList.add(outputInfo); } + private void addDisplayData(String name, DisplayData displayData) { + List> serializedItems = Lists.newArrayList(); + for (DisplayData.Item item : displayData.items()) { + serializedItems.add(MAPPER.convertValue(item, Map.class)); + } + + addList(getProperties(), name, serializedItems); + } + @Override public OutputReference asOutputReference(PValue value) { AppliedPTransform transform = diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java index 05fa7c7881..dadc7309da 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java @@ -26,6 +26,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonInclude; + import org.apache.avro.reflect.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -214,10 +217,12 @@ private Item( this.label = label; } + @JsonGetter("namespace") public String getNamespace() { return ns; } + @JsonGetter("key") public String getKey() { return key; } @@ -226,6 +231,7 @@ public String getKey() { * Retrieve the {@link DisplayData.Type} of display metadata. All metadata conforms to a * predefined set of allowed types. */ + @JsonGetter("type") public Type getType() { return type; } @@ -233,6 +239,7 @@ public Type getType() { /** * Retrieve the value of the metadata item. */ + @JsonGetter("value") public String getValue() { return value; } @@ -244,6 +251,8 @@ public String getValue() { *

    Some display data types will not provide a short value, in which case the return value * will be null. */ + @JsonGetter("shortValue") + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable public String getShortValue() { return shortValue; @@ -255,6 +264,8 @@ public String getShortValue() { * *

    If no label was specified, this will return {@code null}. */ + @JsonGetter("label") + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable public String getLabel() { return label; @@ -266,8 +277,10 @@ public String getLabel() { * *

    If no URL was specified, this will return {@code null}. */ + @JsonGetter("linkUrl") + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - public String getUrl() { + public String getLinkUrl() { return url; } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java index ec6518976b..81572ea207 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java @@ -103,4 +103,5 @@ public class PropertyNames { public static final String VALIDATE_SINK = "validate_sink"; public static final String VALIDATE_SOURCE = "validate_source"; public static final String VALUE = "value"; + public static final String DISPLAY_DATA = "display_data"; } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index 497552f901..65f8dde266 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -20,9 +20,11 @@ import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary; import static com.google.cloud.dataflow.sdk.util.Structs.getString; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -53,6 +55,7 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.Sum; import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.GcsUtil; import com.google.cloud.dataflow.sdk.util.OutputReference; import com.google.cloud.dataflow.sdk.util.PropertyNames; @@ -81,6 +84,8 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -91,9 +96,9 @@ * Tests for DataflowPipelineTranslator. */ @RunWith(JUnit4.class) -public class DataflowPipelineTranslatorTest { +public class DataflowPipelineTranslatorTest implements Serializable { - @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); // A Custom Mockito matcher for an initial Job that checks that all // expected fields are set. @@ -496,7 +501,7 @@ private static Step createPredefinedStep() throws Exception { return step; } - private static class NoOpFn extends DoFn{ + private static class NoOpFn extends DoFn { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -796,4 +801,89 @@ public void testToIterableTranslationWithIsmSideInput() throws Exception { Step collectionToSingletonStep = steps.get(2); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } + + @Test + public void testStepDisplayData() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + DataflowPipeline pipeline = DataflowPipeline.create(options); + + DoFn fn1 = new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("foo", "bar") + .add("foo2", DataflowPipelineTranslatorTest.class) + .withLabel("Test Class") + .withLinkUrl("http://www.google.com"); + } + }; + + DoFn fn2 = new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo3", "barge"); + } + }; + + pipeline + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(fn1)) + .apply(ParDo.of(fn2)); + + Job job = translator.translate( + pipeline, pipeline.getRunner(), Collections.emptyList()).getJob(); + + List steps = job.getSteps(); + assertEquals(3, steps.size()); + + Map parDo1Properties = steps.get(1).getProperties(); + Map parDo2Properties = steps.get(2).getProperties(); + assertThat(parDo1Properties, hasKey("display_data")); + + Collection> fn1displayData = + (Collection>) parDo1Properties.get("display_data"); + Collection> fn2displayData = + (Collection>) parDo2Properties.get("display_data"); + + ImmutableList expectedFn1DisplayData = ImmutableList.of( + ImmutableMap.builder() + .put("namespace", fn1.getClass().getName()) + .put("key", "foo") + .put("type", "STRING") + .put("value", "bar") + .build(), + ImmutableMap.builder() + .put("namespace", fn1.getClass().getName()) + .put("key", "foo2") + .put("type", "JAVA_CLASS") + .put("value", DataflowPipelineTranslatorTest.class.getName()) + .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName()) + .put("label", "Test Class") + .put("linkUrl", "http://www.google.com") + .build() + ); + + ImmutableList expectedFn2DisplayData = ImmutableList.of( + ImmutableMap.builder() + .put("namespace", fn2.getClass().getName()) + .put("key", "foo3") + .put("type", "STRING") + .put("value", "barge") + .build() + ); + + assertEquals(expectedFn1DisplayData, fn1displayData); + assertEquals(expectedFn2DisplayData, fn2displayData); + } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java index 13dd618657..dfc8c38f52 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java @@ -606,7 +606,7 @@ private static Matcher hasUrl(Matcher urlMatcher) { urlMatcher, "display item with url", "URL") { @Override protected String featureValueOf(DisplayData.Item actual) { - return actual.getUrl(); + return actual.getLinkUrl(); } }; }