diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index ac1fa43603e3..0f609dfc0337 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ExecutableTrigger; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import org.joda.time.Instant; @@ -112,4 +113,13 @@ public void onOnlyFiring(TriggerContext context) throws Exception { subtrigger.invokeOnFire(context); } } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterAll.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java index 83e0beadf68a..7ec3ce9db0dd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java @@ -36,10 +36,12 @@ import org.joda.time.Duration; import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; +import org.joda.time.format.PeriodFormatter; import java.util.List; +import java.util.Locale; import java.util.Objects; - import javax.annotation.Nullable; /** @@ -59,6 +61,8 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger { StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "delayed", InstantCoder.of(), Min.MinFn.naturalOrder())); + private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); + /** * To complete an implementation, return the desired time from the TriggerContext. */ @@ -276,6 +280,11 @@ public boolean equals(Object object) { public int hashCode() { return Objects.hash(delay); } + + @Override + public String toString() { + return PERIOD_FORMATTER.print(delay.toPeriod()); + } } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 0c80851f3d83..59cb73ce150c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.util.ExecutableTrigger; +import com.google.common.base.Joiner; import org.joda.time.Instant; import java.util.Arrays; @@ -127,6 +128,15 @@ public void onFire(Trigger.TriggerContext context) throws Exception { updateFinishedState(context); } + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterEach.inOrder("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + private void updateFinishedState(TriggerContext context) { context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index 1462ec485033..a8508a3fc8ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ExecutableTrigger; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import org.joda.time.Instant; @@ -108,6 +109,15 @@ protected void onOnlyFiring(TriggerContext context) throws Exception { } } + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterFirst.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + private void updateFinishedStatus(TriggerContext c) { boolean anyFinished = false; for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index b2ea1b42cce3..05c681562ec4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Objects; - import javax.annotation.Nullable; /** @@ -74,7 +73,15 @@ protected Trigger getContinuationTrigger(List continuationTriggers) { @Override public String toString() { - return "AfterProcessingTime.pastFirstElementInPane(" + timestampMappers + ")"; + StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()"); + for (SerializableFunction delayFn : timestampMappers) { + builder + .append(".plusDelayOf(") + .append(delayFn) + .append(")"); + } + + return builder.toString(); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 05c6eb883657..e48cc4435b0a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -63,6 +63,8 @@ @Experimental(Experimental.Kind.TRIGGER) public class AfterWatermark { + private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()"; + // Static factory class. private AfterWatermark() {} @@ -220,6 +222,26 @@ public void onFire(Trigger.TriggerContext context) throws Exception { } } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(TO_STRING); + + Trigger earlyTrigger = subTriggers.get(EARLY_INDEX); + if (!(earlyTrigger instanceof Never.NeverTrigger)) { + builder + .append(".withEarlyFirings(") + .append(earlyTrigger) + .append(")"); + } + + builder + .append(".withLateFirings(") + .append(subTriggers.get(LATE_INDEX)) + .append(")"); + + return builder.toString(); + } + private void onNonLateFiring(Trigger.TriggerContext context) throws Exception { // We have not yet transitioned to late firings. ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); @@ -328,7 +350,7 @@ public FromEndOfWindow getContinuationTrigger(List continuationTriggers @Override public String toString() { - return "AfterWatermark.pastEndOfWindow()"; + return TO_STRING; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java index 794d5fd3283e..86565370eef6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -35,6 +36,8 @@ */ public class CalendarWindows { + private static final DateTime DEFAULT_START_DATE = new DateTime(0, DateTimeZone.UTC); + /** * Returns a {@link WindowFn} that windows elements into periods measured by days. * @@ -42,7 +45,7 @@ public class CalendarWindows { * separate windows for each day. */ public static DaysWindows days(int number) { - return new DaysWindows(number, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC); + return new DaysWindows(number, DEFAULT_START_DATE, DateTimeZone.UTC); } /** @@ -54,7 +57,7 @@ public static DaysWindows days(int number) { public static DaysWindows weeks(int number, int startDayOfWeek) { return new DaysWindows( 7 * number, - new DateTime(0, DateTimeZone.UTC).withDayOfWeek(startDayOfWeek), + DEFAULT_START_DATE.withDayOfWeek(startDayOfWeek), DateTimeZone.UTC); } @@ -67,7 +70,7 @@ public static DaysWindows weeks(int number, int startDayOfWeek) { * and the first window begins in January 2014. */ public static MonthsWindows months(int number) { - return new MonthsWindows(number, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC); + return new MonthsWindows(number, 1, DEFAULT_START_DATE, DateTimeZone.UTC); } /** @@ -79,7 +82,7 @@ public static MonthsWindows months(int number) { * America/Los_Angeles time zone. */ public static YearsWindows years(int number) { - return new YearsWindows(number, 1, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC); + return new YearsWindows(number, 1, 1, DEFAULT_START_DATE, DateTimeZone.UTC); } /** @@ -142,6 +145,14 @@ public boolean isCompatible(WindowFn other) { && timeZone == that.timeZone; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numDays", number) + .addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(), + new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant()); + } + public int getNumber() { return number; } @@ -229,6 +240,14 @@ public boolean isCompatible(WindowFn other) { && timeZone == that.timeZone; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numMonths", number) + .addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(), + new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant()); + } + public int getNumber() { return number; } @@ -325,6 +344,14 @@ public boolean isCompatible(WindowFn other) { && timeZone == that.timeZone; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numYears", number) + .addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(), + new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant()); + } + public DateTimeZone getTimeZone() { return timeZone; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java index cc4388783b14..bba1f3beb68c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -82,6 +83,13 @@ public IntervalWindow assignWindow(Instant timestamp) { return new IntervalWindow(new Instant(start), size); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("size", size) + .addIfNotDefault("offset", offset, Duration.ZERO); + } + @Override public Coder windowCoder() { return IntervalWindow.getCoder(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 809e841ebb27..8e3e664b0514 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -41,7 +41,8 @@ public static OnceTrigger ever() { return new NeverTrigger(); } - private static class NeverTrigger extends OnceTrigger { + // package-private in order to check identity for string formatting. + static class NeverTrigger extends OnceTrigger { protected NeverTrigger() { super(null); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 48e1dc255a2c..c48f5f47daca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -93,6 +93,11 @@ public void onFire(Trigger.TriggerContext context) throws Exception { updateFinishedState(context); } + @Override + public String toString() { + return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL)); + } + private void updateFinishedState(TriggerContext c) throws Exception { boolean anyStillFinished = false; for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 414f1070bd39..ec79cf93dabf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -40,7 +40,7 @@ public class Repeatedly extends Trigger { private static final int REPEATED = 0; /** - * Create a composite trigger that repeatedly executes the trigger {@code toRepeat}, firing each + * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each * time it fires and ignoring any indications to finish. * *

Unless used with {@link Trigger#orFinally} the composite trigger will never finish. @@ -92,6 +92,11 @@ public void onFire(TriggerContext context) throws Exception { } } + @Override + public String toString() { + return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); + } + private ExecutableTrigger getRepeated(TriggerContext context) { return context.trigger().subTrigger(REPEATED); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java index 3be6454cbd1c..74ca26800066 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; @@ -97,6 +98,11 @@ public Duration getGapDuration() { return gapDuration; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("gapDuration", gapDuration); + } + @Override public boolean equals(Object object) { if (!(object instanceof Sessions)) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java index 3a7b0720610a..abb407849089 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -139,6 +140,14 @@ public boolean isCompatible(WindowFn other) { return equals(other); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("size", size) + .add("period", period) + .add("offset", offset); + } + /** * Return the last start of a sliding window that contains the timestamp. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 2c162f2ff432..da512b815907 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.PCollection; @@ -595,6 +596,30 @@ public PCollection apply(PCollection input) { input.getPipeline(), outputStrategy, input.isBounded()); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("windowFn", windowFn.getClass()) + .include(windowFn) + .addIfNotNull("allowedLateness", allowedLateness); + + if (trigger != null && !(trigger instanceof DefaultTrigger)) { + builder.add("trigger", trigger.toString()); + } + + if (mode != null) { + builder.add("accumulationMode", mode.toString()); + } + + if (closingBehavior != null) { + builder.add("closingBehavior", closingBehavior.toString()); + } + + if (outputTimeFn != null) { + builder.add("outputTimeFn", outputTimeFn.getClass()); + } + } + @Override protected Coder getDefaultOutputCoder(PCollection input) { return input.getCoder(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 1dda7d0c7f3a..2eac93612124 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -20,6 +20,8 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import com.google.common.collect.Ordering; @@ -50,7 +52,7 @@ * windows used by this {@code WindowFn} */ public abstract class WindowFn - implements Serializable { + implements Serializable, HasDisplayData { /** * Information available when running {@link #assignWindows}. */ @@ -178,6 +180,16 @@ public boolean assignsToSingleWindow() { return false; } + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } + /** * A compatibility adapter that will return the assigned timestamps according to the * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java index e5168b26448b..0a47634970fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java @@ -60,4 +60,9 @@ public boolean shouldFire(Trigger.TriggerContext context) throws Exception { @Override public void onFire(Trigger.TriggerContext context) throws Exception { } + + @Override + public String toString() { + return "ReshuffleTrigger()"; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java index 9ab4e5c7cbbe..969c1fe722cf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java @@ -149,4 +149,9 @@ public void testContinuation() throws Exception { afterAll.getContinuationTrigger()); } + @Test + public void testToString() { + Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterAll.of(t1, t2)", trigger.toString()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java index d3d43c97b78d..f5d83a708b3f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java @@ -120,4 +120,14 @@ public void testContinuation() throws Exception { trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())), afterEach.getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = AfterEach.inOrder( + StubTrigger.named("t1"), + StubTrigger.named("t2"), + StubTrigger.named("t3")); + + assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java index d70c4d6a950d..c0a9f2be3d02 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java @@ -173,4 +173,10 @@ public void testContinuation() throws Exception { AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), afterFirst.getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterFirst.of(t1, t2)", trigger.toString()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java index eeda9ed7b343..827d4c63907f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java @@ -124,4 +124,10 @@ public void testContinuation() throws Exception { AfterPane.elementCountAtLeast(1), AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = AfterPane.elementCountAtLeast(5); + assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java index 4245e786f1e4..81aad3389ff5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -155,4 +155,35 @@ public void testCompatibilityIdentical() throws Exception { .plusDelayOf(Duration.standardMinutes(1L)); assertTrue(t1.isCompatible(t2)); } + + @Test + public void testToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); + assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString()); + } + + @Test + public void testWithDelayToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(5)); + + assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)", + trigger.toString()); + } + + @Test + public void testBuiltUpToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withLateFirings(AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(10))) + .buildTrigger(); + + String expected = "AfterWatermark.pastEndOfWindow()" + + ".withLateFirings(AfterProcessingTime" + + ".pastFirstElementInPane()" + + ".plusDelayOf(10 minutes))"; + + assertEquals(expected, trigger.toString()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java index 00e2d2274324..ef8471407521 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms.windowing; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doNothing; @@ -336,4 +337,30 @@ public void testEarlyAndLateOnMergeRewinds() throws Exception { tester.advanceInputWatermark(new Instant(15)); assertTrue(tester.shouldFire(mergedWindow)); } + + @Test + public void testFromEndOfWindowToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow(); + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } + + @Test + public void testLateFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withLateFirings(StubTrigger.named("t1")) + .buildTrigger(); + + assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString()); + } + + @Test + public void testEarlyAndLateFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withEarlyFirings(StubTrigger.named("t1")) + .withLateFirings(StubTrigger.named("t2")) + .buildTrigger(); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)", + trigger.toString()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java index 31e9a9478d31..4598a2777c11 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java @@ -19,9 +19,13 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import org.apache.beam.sdk.transforms.display.DisplayData; + import org.joda.time.DateTime; import org.joda.time.DateTimeConstants; import org.joda.time.DateTimeZone; @@ -259,4 +263,31 @@ public void testTimeZone() throws Exception { CalendarWindows.days(1).withTimeZone(timeZone), timestamps)); } + + @Test + public void testDisplayData() { + DateTimeZone timeZone = DateTimeZone.forID("America/Los_Angeles"); + Instant jan1 = new DateTime(1990, 1, 1, 0, 0, timeZone).toInstant(); + + CalendarWindows.DaysWindows daysWindow = CalendarWindows.days(5) + .withStartingDay(1990, 1, 1) + .withTimeZone(timeZone); + DisplayData daysDisplayData = DisplayData.from(daysWindow); + assertThat(daysDisplayData, hasDisplayItem("numDays", 5)); + assertThat(daysDisplayData, hasDisplayItem("startDate", jan1)); + + CalendarWindows.MonthsWindows monthsWindow = CalendarWindows.months(2) + .withStartingMonth(1990, 1) + .withTimeZone(timeZone); + DisplayData monthsDisplayData = DisplayData.from(monthsWindow); + assertThat(monthsDisplayData, hasDisplayItem("numMonths", 2)); + assertThat(monthsDisplayData, hasDisplayItem("startDate", jan1)); + + CalendarWindows.YearsWindows yearsWindow = CalendarWindows.years(4) + .withStartingYear(1990) + .withTimeZone(timeZone); + DisplayData yearsDisplayData = DisplayData.from(yearsWindow); + assertThat(yearsDisplayData, hasDisplayItem("numYears", 4)); + assertThat(yearsDisplayData, hasDisplayItem("startDate", jan1)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java index c8ee9ace648f..fc1caac40c09 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; @@ -28,6 +29,7 @@ import static org.junit.Assert.fail; import org.apache.beam.sdk.testing.WindowFnTestUtils; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -123,4 +125,16 @@ public void testValidOutputTimes() throws Exception { FixedWindows.of(new Duration(500)), timestamp); } } + + @Test + public void testDisplayData() { + Duration offset = Duration.standardSeconds(1234); + Duration size = Duration.standardSeconds(2345); + + FixedWindows fixedWindows = FixedWindows.of(size).withOffset(offset); + DisplayData displayData = DisplayData.from(fixedWindows); + + assertThat(displayData, hasDisplayItem("size", size)); + assertThat(displayData, hasDisplayItem("offset", offset)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java index e4e9cb3785d3..ea178a8b3128 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java @@ -207,4 +207,10 @@ public void testContinuation() throws Exception { triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())), bOrFinallyA.getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2")); + assertEquals("t1.orFinally(t2)", trigger.toString()); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java index 9b74767b38f4..ddb9f9a39b77 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java @@ -209,4 +209,17 @@ public void testRepeatedlyProcessingTime() throws Exception { assertFalse(tester.shouldFire(window)); } + + @Test + public void testToString() { + Trigger trigger = Repeatedly.forever(new StubTrigger() { + @Override + public String toString() { + return "innerTrigger"; + } + }); + + assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString()); + } + } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java index 932d416f27fd..a543359e9c76 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -27,6 +28,7 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.testing.WindowFnTestUtils; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.collect.ImmutableList; @@ -155,4 +157,12 @@ public void testValidOutputAtEndTimes() throws Exception { (List) ImmutableList.of(1L, 3L), (List) ImmutableList.of(0L, 5L, 10L, 15L, 20L))); } + + @Test + public void testDisplayData() { + Duration gapDuration = Duration.standardMinutes(234); + Sessions session = Sessions.withGapDuration(gapDuration); + assertThat(DisplayData.from(session), + hasDisplayItem("gapDuration", gapDuration)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java index a310cb3bfab1..047a413242b5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java @@ -19,12 +19,15 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.testing.WindowFnTestUtils; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -192,4 +195,21 @@ public void testOutputTimesNonInterference() throws Exception { SlidingWindows.of(new Duration(1000)).every(new Duration(500)), timestamp); } } + + @Test + public void testDisplayData() { + Duration windowSize = Duration.standardSeconds(1234); + Duration offset = Duration.standardSeconds(2345); + Duration period = Duration.standardSeconds(3456); + + SlidingWindows slidingWindowFn = SlidingWindows + .of(windowSize) + .every(period) + .withOffset(offset); + + DisplayData displayData = DisplayData.from(slidingWindowFn); + assertThat(displayData, hasDisplayItem("size", windowSize)); + assertThat(displayData, hasDisplayItem("period", period)); + assertThat(displayData, hasDisplayItem("offset", offset)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java new file mode 100644 index 000000000000..738c0bcf7b3d --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.api.client.util.Lists; +import org.joda.time.Instant; + +import java.util.List; + +/** + * No-op {@link OnceTrigger} implementation for testing. + */ +abstract class StubTrigger extends Trigger.OnceTrigger { + /** + * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}. + */ + static StubTrigger named(final String name) { + return new StubTrigger() { + @Override + public String toString() { + return name; + } + }; + } + + protected StubTrigger() { + super(Lists.newArrayList()); + } + + @Override + protected void onOnlyFiring(TriggerContext context) throws Exception { + } + + @Override + public void onElement(OnElementContext c) throws Exception { + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + } + + @Override + public boolean shouldFire(TriggerContext context) throws Exception { + return false; + } + + @Override + protected Trigger getContinuationTrigger(List continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java index a8287a3dbfc1..43c8bd8c5d76 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java @@ -38,7 +38,7 @@ public class TriggerTest { @Test public void testTriggerToString() throws Exception { assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString()); - assertEquals("Repeatedly(AfterWatermark.pastEndOfWindow())", + assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())", Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString()); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 359ad23071e4..6be6df819a91 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -17,7 +17,13 @@ */ package org.apache.beam.sdk.transforms.windowing; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; + import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -33,6 +39,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.KV; @@ -224,4 +231,56 @@ public void processElement(ProcessContext c) throws Exception { pipeline.run(); } + + @Test + public void testDisplayData() { + FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5)); + AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); + Duration allowedLateness = Duration.standardMinutes(10); + Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; + OutputTimeFn outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); + + Window.Bound window = Window + .into(windowFn) + .triggering(triggerBuilder) + .accumulatingFiredPanes() + .withAllowedLateness(allowedLateness, closingBehavior) + .withOutputTimeFn(outputTimeFn); + + DisplayData displayData = DisplayData.from(window); + + assertThat(displayData, hasDisplayItem("windowFn", windowFn.getClass())); + assertThat(displayData, includes(windowFn)); + + assertThat(displayData, hasDisplayItem("trigger", triggerBuilder.toString())); + assertThat(displayData, + hasDisplayItem("accumulationMode", AccumulationMode.ACCUMULATING_FIRED_PANES.toString())); + assertThat(displayData, + hasDisplayItem("allowedLateness", allowedLateness)); + assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); + assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass())); + } + + @Test + public void testDisplayDataExcludesUnspecifiedProperties() { + Window.Bound window = Window.into(new GlobalWindows()); + + DisplayData displayData = DisplayData.from(window); + assertThat(displayData, not(hasDisplayItem(hasKey(isOneOf( + "trigger", + "outputTimeFn", + "accumulationMode", + "allowedLateness", + "closingBehavior"))))); + + } + + @Test + public void testDisplayDataExcludesDefaultTrigger() { + Window.Bound window = Window.into(new GlobalWindows()) + .triggering(DefaultTrigger.of()); + + DisplayData data = DisplayData.from(window); + assertThat(data, not(hasDisplayItem(hasKey("trigger")))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java index cabde7f8c594..b17ce81cf482 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java @@ -17,12 +17,14 @@ */ package org.apache.beam.sdk.util; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; import org.joda.time.Duration; import org.joda.time.Instant; @@ -36,7 +38,7 @@ @RunWith(JUnit4.class) public class ReshuffleTriggerTest { - /** Public so that other tests can instantiate ReshufleTrigger. */ + /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */ public static ReshuffleTrigger forTest() { return new ReshuffleTrigger<>(); } @@ -57,4 +59,10 @@ public void testOnTimer() throws Exception { tester.fireIfShouldFire(arbitraryWindow); assertFalse(tester.isMarkedFinished(arbitraryWindow)); } + + @Test + public void testToString() { + Trigger trigger = new ReshuffleTrigger<>(); + assertEquals("ReshuffleTrigger()", trigger.toString()); + } }