diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java index a315692d9c0a..97f561658fe8 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java @@ -29,14 +29,11 @@ /** * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired. - * - * @param {@link BoundedWindow} subclass used to represent the windows used by this - * {@code Trigger} */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterAll extends OnceTrigger { +public class AfterAll extends OnceTrigger { - private AfterAll(List> subTriggers) { + private AfterAll(List subTriggers) { super(subTriggers); Preconditions.checkArgument(subTriggers.size() > 1); } @@ -45,14 +42,13 @@ private AfterAll(List> subTriggers) { * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static OnceTrigger of( - OnceTrigger... triggers) { - return new AfterAll(Arrays.>asList(triggers)); + public static OnceTrigger of(OnceTrigger... triggers) { + return new AfterAll(Arrays.asList(triggers)); } @Override public void onElement(OnElementContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) { // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH. // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH. subTrigger.invokeOnElement(c); @@ -61,21 +57,21 @@ public void onElement(OnElementContext c) throws Exception { @Override public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { subTrigger.invokeOnMerge(c); } boolean allFinished = true; - for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) { allFinished &= c.forTrigger(subTrigger1).trigger().isFinished(); } c.trigger().setFinished(allFinished); } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the latest of its sub-triggers. Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE; - for (Trigger subTrigger : subTriggers) { + for (Trigger subTrigger : subTriggers) { Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window); if (deadline.isBefore(subDeadline)) { deadline = subDeadline; @@ -85,8 +81,8 @@ public Instant getWatermarkThatGuaranteesFiring(W window) { } @Override - public OnceTrigger getContinuationTrigger(List> continuationTriggers) { - return new AfterAll(continuationTriggers); + public OnceTrigger getContinuationTrigger(List continuationTriggers) { + return new AfterAll(continuationTriggers); } /** @@ -96,7 +92,7 @@ public OnceTrigger getContinuationTrigger(List> continuationTrigge */ @Override public boolean shouldFire(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { if (!context.forTrigger(subtrigger).trigger().isFinished() && !subtrigger.invokeShouldFire(context)) { return false; @@ -111,7 +107,7 @@ public boolean shouldFire(TriggerContext context) throws Exception { */ @Override public void onOnlyFiring(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { subtrigger.invokeOnFire(context); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java index bc626b2a0593..2143b0c81dc7 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java @@ -48,7 +48,7 @@ *

This class is for internal use only and may change at any time. */ @Experimental(Experimental.Kind.TRIGGER) -public abstract class AfterDelayFromFirstElement extends OnceTrigger { +public abstract class AfterDelayFromFirstElement extends OnceTrigger { protected static final List> IDENTITY = ImmutableList.>of(); @@ -62,14 +62,14 @@ public abstract class AfterDelayFromFirstElement extend * To complete an implementation, return the desired time from the TriggerContext. */ @Nullable - public abstract Instant getCurrentTime(Trigger.TriggerContext context); + public abstract Instant getCurrentTime(Trigger.TriggerContext context); /** * To complete an implementation, return a new instance like this one, but incorporating * the provided timestamp mapping functions. Generally should be used by calling the * constructor of this class from the constructor of the subclass. */ - protected abstract AfterDelayFromFirstElement newWith( + protected abstract AfterDelayFromFirstElement newWith( List> transform); /** @@ -100,7 +100,7 @@ private Instant getTargetTimestamp(OnElementContext c) { *

TODO: Consider sharing this with FixedWindows, and bring over the equivalent of * CalendarWindows. */ - public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) { + public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) { return newWith(new AlignFn(size, offset)); } @@ -108,7 +108,7 @@ public AfterDelayFromFirstElement alignedTo(final Duration size, final Instan * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp * since the epoch. */ - public AfterDelayFromFirstElement alignedTo(final Duration size) { + public AfterDelayFromFirstElement alignedTo(final Duration size) { return alignedTo(size, new Instant(0)); } @@ -118,7 +118,7 @@ public AfterDelayFromFirstElement alignedTo(final Duration size) { * @param delay the delay to add * @return An updated time trigger that will wait the additional time before firing. */ - public AfterDelayFromFirstElement plusDelayOf(final Duration delay) { + public AfterDelayFromFirstElement plusDelayOf(final Duration delay) { return newWith(new DelayFn(delay)); } @@ -127,22 +127,22 @@ public AfterDelayFromFirstElement plusDelayOf(final Duration delay) { * {@link #plusDelayOf} and {@link #alignedTo}. */ @Deprecated - public OnceTrigger mappedTo(SerializableFunction timestampMapper) { + public OnceTrigger mappedTo(SerializableFunction timestampMapper) { return newWith(timestampMapper); } @Override - public boolean isCompatible(Trigger other) { + public boolean isCompatible(Trigger other) { if (!getClass().equals(other.getClass())) { return false; } - AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other; + AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other; return this.timestampMappers.equals(that.timestampMappers); } - private AfterDelayFromFirstElement newWith( + private AfterDelayFromFirstElement newWith( SerializableFunction timestampMapper) { return newWith( ImmutableList.>builder() @@ -173,7 +173,7 @@ public void onElement(OnElementContext c) throws Exception { } @Override - public void prefetchOnMerge(MergingStateAccessor state) { + public void prefetchOnMerge(MergingStateAccessor state) { super.prefetchOnMerge(state); StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG); } @@ -218,12 +218,12 @@ public void clear(TriggerContext c) throws Exception { } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read(); return delayedUntil != null && getCurrentTime(context) != null @@ -231,7 +231,7 @@ && getCurrentTime(context) != null } @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { + protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { clear(context); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java index 164cd87583e1..e35cc1569869 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java @@ -42,14 +42,11 @@ *

  • {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as * {@code Repeatedly.forever(a)}, since the repeated trigger never finishes. * - * - * @param {@link BoundedWindow} subclass used to represent the windows used by this - * {@code Trigger} */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterEach extends Trigger { +public class AfterEach extends Trigger { - private AfterEach(List> subTriggers) { + private AfterEach(List subTriggers) { super(subTriggers); checkArgument(subTriggers.size() > 1); } @@ -58,8 +55,8 @@ private AfterEach(List> subTriggers) { * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static Trigger inOrder(Trigger... triggers) { - return new AfterEach(Arrays.>asList(triggers)); + public static Trigger inOrder(Trigger... triggers) { + return new AfterEach(Arrays.asList(triggers)); } @Override @@ -69,7 +66,7 @@ public void onElement(OnElementContext c) throws Exception { c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); } else { // If merges are possible, we need to run all subtriggers in parallel - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { // Even if the subTrigger is done, it may be revived via merging and must have // adequate state. subTrigger.invokeOnElement(c); @@ -86,7 +83,7 @@ public void onMerge(OnMergeContext context) throws Exception { // also automatic because they are cleared whenever this trigger // fires. boolean priorTriggersAllFinished = true; - for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { if (priorTriggersAllFinished) { subTrigger.invokeOnMerge(context); priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished(); @@ -98,31 +95,31 @@ public void onMerge(OnMergeContext context) throws Exception { } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire at least once when the first trigger in the sequence // fires at least once. return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window); } @Override - public Trigger getContinuationTrigger(List> continuationTriggers) { - return Repeatedly.forever(new AfterFirst(continuationTriggers)); + public Trigger getContinuationTrigger(List continuationTriggers) { + return Repeatedly.forever(new AfterFirst(continuationTriggers)); } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger(); + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger(); return firstUnfinished.invokeShouldFire(context); } @Override - public void onFire(Trigger.TriggerContext context) throws Exception { + public void onFire(Trigger.TriggerContext context) throws Exception { context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context); // Reset all subtriggers if in a merging context; any may be revived by merging so they are // all run in parallel for each pending pane. if (context.trigger().isMerging()) { - for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { subTrigger.invokeClear(context); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java index d6817ea64560..b7d491e2b769 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java @@ -30,14 +30,11 @@ /** * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have * fired. - * - * @param {@link BoundedWindow} subclass used to represent the windows used by this - * {@code Trigger} */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterFirst extends OnceTrigger { +public class AfterFirst extends OnceTrigger { - AfterFirst(List> subTriggers) { + AfterFirst(List subTriggers) { super(subTriggers); Preconditions.checkArgument(subTriggers.size() > 1); } @@ -46,31 +43,31 @@ public class AfterFirst extends OnceTrigger { * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static OnceTrigger of( - OnceTrigger... triggers) { - return new AfterFirst(Arrays.>asList(triggers)); + public static OnceTrigger of( + OnceTrigger... triggers) { + return new AfterFirst(Arrays.asList(triggers)); } @Override public void onElement(OnElementContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { subTrigger.invokeOnElement(c); } } @Override public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { subTrigger.invokeOnMerge(c); } updateFinishedStatus(c); } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the earliest of its sub-triggers. Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (Trigger subTrigger : subTriggers) { + for (Trigger subTrigger : subTriggers) { Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window); if (deadline.isAfter(subDeadline)) { deadline = subDeadline; @@ -80,13 +77,13 @@ public Instant getWatermarkThatGuaranteesFiring(W window) { } @Override - public OnceTrigger getContinuationTrigger(List> continuationTriggers) { - return new AfterFirst(continuationTriggers); + public OnceTrigger getContinuationTrigger(List continuationTriggers) { + return new AfterFirst(continuationTriggers); } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { if (context.forTrigger(subtrigger).trigger().isFinished() || subtrigger.invokeShouldFire(context)) { return true; @@ -97,7 +94,7 @@ public boolean shouldFire(Trigger.TriggerContext context) throws Exception { @Override protected void onOnlyFiring(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { TriggerContext subContext = context.forTrigger(subtrigger); if (subtrigger.invokeShouldFire(subContext)) { // If the trigger is ready to fire, then do whatever it needs to do. @@ -112,7 +109,7 @@ protected void onOnlyFiring(TriggerContext context) throws Exception { private void updateFinishedStatus(TriggerContext c) { boolean anyFinished = false; - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { anyFinished |= c.forTrigger(subTrigger).trigger().isFinished(); } c.trigger().setFinished(anyFinished); diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java index 94f43acd7022..76136f7e30fd 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java @@ -35,12 +35,9 @@ /** * {@link Trigger}s that fire based on properties of the elements in the current pane. - * - * @param {@link BoundedWindow} subclass used to represent the windows used by this - * {@link Trigger} */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterPane extends OnceTrigger{ +public class AfterPane extends OnceTrigger { private static final StateTag> ELEMENTS_IN_PANE_TAG = @@ -57,8 +54,8 @@ private AfterPane(int countElems) { /** * Creates a trigger that fires when the pane contains at least {@code countElems} elements. */ - public static AfterPane elementCountAtLeast(int countElems) { - return new AfterPane<>(countElems); + public static AfterPane elementCountAtLeast(int countElems) { + return new AfterPane(countElems); } @Override @@ -67,7 +64,7 @@ public void onElement(OnElementContext c) throws Exception { } @Override - public void prefetchOnMerge(MergingStateAccessor state) { + public void prefetchOnMerge(MergingStateAccessor state) { super.prefetchOnMerge(state); StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG); } @@ -92,7 +89,7 @@ public void prefetchShouldFire(StateAccessor state) { } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { long count = context.state().access(ELEMENTS_IN_PANE_TAG).read(); return count >= countElems; } @@ -103,17 +100,17 @@ public void clear(TriggerContext c) throws Exception { } @Override - public boolean isCompatible(Trigger other) { + public boolean isCompatible(Trigger other) { return this.equals(other); } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } @Override - public OnceTrigger getContinuationTrigger(List> continuationTriggers) { + public OnceTrigger getContinuationTrigger(List continuationTriggers) { return AfterPane.elementCountAtLeast(1); } @@ -130,7 +127,7 @@ public boolean equals(Object obj) { if (!(obj instanceof AfterPane)) { return false; } - AfterPane that = (AfterPane) obj; + AfterPane that = (AfterPane) obj; return this.countElems == that.countElems; } @@ -140,7 +137,7 @@ public int hashCode() { } @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { + protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { clear(context); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java index 6af6001d35c5..7a82d9c7d31c 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java @@ -34,15 +34,13 @@ * *

    The time at which to fire the timer can be adjusted via the methods in {@link TimeTrigger}, * such as {@link TimeTrigger#plusDelayOf} or {@link TimeTrigger#alignedTo}. - * - * @param {@link BoundedWindow} subclass used to represent the windows used */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterProcessingTime extends AfterDelayFromFirstElement { +public class AfterProcessingTime extends AfterDelayFromFirstElement { @Override @Nullable - public Instant getCurrentTime(Trigger.TriggerContext context) { + public Instant getCurrentTime(Trigger.TriggerContext context) { return context.currentProcessingTime(); } @@ -54,24 +52,24 @@ private AfterProcessingTime(List> transfo * Creates a trigger that fires when the current processing time passes the processing time * at which this trigger saw the first element in a pane. */ - public static AfterProcessingTime pastFirstElementInPane() { - return new AfterProcessingTime(IDENTITY); + public static AfterProcessingTime pastFirstElementInPane() { + return new AfterProcessingTime(IDENTITY); } @Override - protected AfterProcessingTime newWith( + protected AfterProcessingTime newWith( List> transforms) { - return new AfterProcessingTime(transforms); + return new AfterProcessingTime(transforms); } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } @Override - protected Trigger getContinuationTrigger(List> continuationTriggers) { - return new AfterSynchronizedProcessingTime(); + protected Trigger getContinuationTrigger(List continuationTriggers) { + return new AfterSynchronizedProcessingTime(); } @Override @@ -87,7 +85,7 @@ public boolean equals(Object obj) { if (!(obj instanceof AfterProcessingTime)) { return false; } - AfterProcessingTime that = (AfterProcessingTime) obj; + AfterProcessingTime that = (AfterProcessingTime) obj; return Objects.equals(this.timestampMappers, that.timestampMappers); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index 37d624703631..4ccba04a99e5 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -28,12 +28,11 @@ import javax.annotation.Nullable; -class AfterSynchronizedProcessingTime - extends AfterDelayFromFirstElement { +class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { @Override @Nullable - public Instant getCurrentTime(Trigger.TriggerContext context) { + public Instant getCurrentTime(Trigger.TriggerContext context) { return context.currentSynchronizedProcessingTime(); } @@ -43,12 +42,12 @@ public AfterSynchronizedProcessingTime() { } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } @Override - protected Trigger getContinuationTrigger(List> continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { return this; } @@ -68,7 +67,7 @@ public int hashCode() { } @Override - protected AfterSynchronizedProcessingTime + protected AfterSynchronizedProcessingTime newWith(List> transforms) { // ignore transforms return this; diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java index 249a12f0cb45..9ef81868d79a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java @@ -59,11 +59,9 @@ * Additionaly firings before or after the watermark can be requested by calling * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}. - * - * @param {@link BoundedWindow} subclass used to represent the windows used. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterWatermark { +public class AfterWatermark { // Static factory class. private AfterWatermark() {} @@ -71,37 +69,37 @@ private AfterWatermark() {} /** * Creates a trigger that fires when the watermark passes the end of the window. */ - public static FromEndOfWindow pastEndOfWindow() { - return new FromEndOfWindow(); + public static FromEndOfWindow pastEndOfWindow() { + return new FromEndOfWindow(); } /** * Interface for building an AfterWatermarkTrigger with early firings already filled in. */ - public interface AfterWatermarkEarly extends TriggerBuilder { + public interface AfterWatermarkEarly extends TriggerBuilder { /** * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires after the watermark has passed the end of the window. */ - TriggerBuilder withLateFirings(OnceTrigger lateTrigger); + TriggerBuilder withLateFirings(OnceTrigger lateTrigger); } /** * Interface for building an AfterWatermarkTrigger with late firings already filled in. */ - public interface AfterWatermarkLate extends TriggerBuilder { + public interface AfterWatermarkLate extends TriggerBuilder { /** * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires before the watermark has passed the end of the window. */ - TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger); + TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger); } /** * A trigger which never fires. Used for the "early" trigger when only a late trigger was * specified. */ - private static class NeverTrigger extends OnceTrigger { + private static class NeverTrigger extends OnceTrigger { protected NeverTrigger() { super(null); @@ -114,54 +112,54 @@ public void onElement(OnElementContext c) throws Exception { } public void onMerge(OnMergeContext c) throws Exception { } @Override - protected Trigger getContinuationTrigger(List> continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { return this; } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return false; } @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { + protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { throw new UnsupportedOperationException( String.format("%s should never fire", getClass().getSimpleName())); } } - private static class AfterWatermarkEarlyAndLate - extends Trigger - implements TriggerBuilder, AfterWatermarkEarly, AfterWatermarkLate { + private static class AfterWatermarkEarlyAndLate + extends Trigger + implements TriggerBuilder, AfterWatermarkEarly, AfterWatermarkLate { private static final int EARLY_INDEX = 0; private static final int LATE_INDEX = 1; - private final OnceTrigger earlyTrigger; - private final OnceTrigger lateTrigger; + private final OnceTrigger earlyTrigger; + private final OnceTrigger lateTrigger; @SuppressWarnings("unchecked") - private AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { + private AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { super(lateTrigger == null - ? ImmutableList.>of(earlyTrigger) - : ImmutableList.>of(earlyTrigger, lateTrigger)); + ? ImmutableList.of(earlyTrigger) + : ImmutableList.of(earlyTrigger, lateTrigger)); this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null"); this.lateTrigger = lateTrigger; } @Override - public TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger) { - return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); + public TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger) { + return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } @Override - public TriggerBuilder withLateFirings(OnceTrigger lateTrigger) { - return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); + public TriggerBuilder withLateFirings(OnceTrigger lateTrigger) { + return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } @Override @@ -172,7 +170,7 @@ public void onElement(OnElementContext c) throws Exception { } else { // If merges can happen, we run for all subtriggers because they might be // de-activated or re-activated - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { subTrigger.invokeOnElement(c); } } @@ -183,7 +181,7 @@ public void onMerge(OnMergeContext c) throws Exception { // NOTE that the ReduceFnRunner will delete all end-of-window timers for the // merged-away windows. - ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX); + ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX); // We check the early trigger to determine if we are still processing it or // if the end of window has transitioned us to the late trigger OnMergeContext earlyContext = c.forTrigger(earlySubtrigger); @@ -194,7 +192,7 @@ public void onMerge(OnMergeContext c) throws Exception { if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { earlyContext.trigger().setFinished(false); if (lateTrigger != null) { - ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); + ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); OnMergeContext lateContext = c.forTrigger(lateSubtrigger); lateContext.trigger().setFinished(false); lateSubtrigger.invokeClear(lateContext); @@ -209,31 +207,31 @@ public void onMerge(OnMergeContext c) throws Exception { } @Override - public Trigger getContinuationTrigger() { - return new AfterWatermarkEarlyAndLate( + public Trigger getContinuationTrigger() { + return new AfterWatermarkEarlyAndLate( earlyTrigger.getContinuationTrigger(), lateTrigger == null ? null : lateTrigger.getContinuationTrigger()); } @Override - protected Trigger getContinuationTrigger(List> continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { throw new UnsupportedOperationException( - "Should not call getContinuationTrigger(List>)"); + "Should not call getContinuationTrigger(List)"); } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // Even without an early or late trigger, we'll still produce a firing at the watermark. return window.maxTimestamp(); } - private boolean endOfWindowReached(Trigger.TriggerContext context) { + private boolean endOfWindowReached(Trigger.TriggerContext context) { return context.currentEventTime() != null && context.currentEventTime().isAfter(context.window().maxTimestamp()); } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { if (!context.trigger().isFinished(EARLY_INDEX)) { // We have not yet transitioned to late firings. // We should fire if either the trigger is ready or we reach the end of the window. @@ -248,7 +246,7 @@ public boolean shouldFire(Trigger.TriggerContext context) throws Exception { } @Override - public void onFire(Trigger.TriggerContext context) throws Exception { + public void onFire(Trigger.TriggerContext context) throws Exception { if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) { onNonLateFiring(context); } else if (lateTrigger != null) { @@ -259,10 +257,10 @@ public void onFire(Trigger.TriggerContext context) throws Exception { } } - private void onNonLateFiring(Trigger.TriggerContext context) throws Exception { + private void onNonLateFiring(Trigger.TriggerContext context) throws Exception { // We have not yet transitioned to late firings. - ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); - Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger); + ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); + Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger); if (!endOfWindowReached(context)) { // This is an early firing, since we have not arrived at the end of the window @@ -291,9 +289,9 @@ private void onNonLateFiring(Trigger.TriggerContext context) throws Exception } - private void onLateFiring(Trigger.TriggerContext context) throws Exception { + private void onLateFiring(Trigger.TriggerContext context) throws Exception { // We are firing the late trigger, with implicit repeat - ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX); + ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX); lateSubtrigger.invokeOnFire(context); // It is a OnceTrigger, so it must have finished; unfinished it and clear it lateSubtrigger.invokeClear(context); @@ -304,7 +302,7 @@ private void onLateFiring(Trigger.TriggerContext context) throws Exception { /** * A watermark trigger targeted relative to the end of the window. */ - public static class FromEndOfWindow extends OnceTrigger { + public static class FromEndOfWindow extends OnceTrigger { private FromEndOfWindow() { super(null); @@ -314,20 +312,20 @@ private FromEndOfWindow() { * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires before the watermark has passed the end of the window. */ - public AfterWatermarkEarly withEarlyFirings(OnceTrigger earlyFirings) { + public AfterWatermarkEarly withEarlyFirings(OnceTrigger earlyFirings) { Preconditions.checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); - return new AfterWatermarkEarlyAndLate(earlyFirings, null); + return new AfterWatermarkEarlyAndLate(earlyFirings, null); } /** * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever * the given {@code Trigger} fires after the watermark has passed the end of the window. */ - public AfterWatermarkLate withLateFirings(OnceTrigger lateFirings) { + public AfterWatermarkLate withLateFirings(OnceTrigger lateFirings) { Preconditions.checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); - return new AfterWatermarkEarlyAndLate(new NeverTrigger(), lateFirings); + return new AfterWatermarkEarlyAndLate(new NeverTrigger(), lateFirings); } @Override @@ -358,12 +356,12 @@ public void onMerge(OnMergeContext c) throws Exception { } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return window.maxTimestamp(); } @Override - public FromEndOfWindow getContinuationTrigger(List> continuationTriggers) { + public FromEndOfWindow getContinuationTrigger(List continuationTriggers) { return this; } @@ -383,16 +381,16 @@ public int hashCode() { } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return endOfWindowReached(context); } - private boolean endOfWindowReached(Trigger.TriggerContext context) { + private boolean endOfWindowReached(Trigger.TriggerContext context) { return context.currentEventTime() != null && context.currentEventTime().isAfter(context.window().maxTimestamp()); } @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { } + protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { } } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java index ccb3a879d198..0e6b5ce62a63 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java @@ -27,11 +27,9 @@ /** * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details. - * - * @param The type of windows being triggered/encoded. */ @Experimental(Experimental.Kind.TRIGGER) -public class DefaultTrigger extends Trigger{ +public class DefaultTrigger extends Trigger{ private DefaultTrigger() { super(null); @@ -40,8 +38,8 @@ private DefaultTrigger() { /** * Returns the default trigger. */ - public static DefaultTrigger of() { - return new DefaultTrigger(); + public static DefaultTrigger of() { + return new DefaultTrigger(); } @Override @@ -66,31 +64,31 @@ public void onMerge(OnMergeContext c) throws Exception { public void clear(TriggerContext c) throws Exception { } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return window.maxTimestamp(); } @Override - public boolean isCompatible(Trigger other) { + public boolean isCompatible(Trigger other) { // Semantically, all default triggers are identical return other instanceof DefaultTrigger; } @Override - public Trigger getContinuationTrigger(List> continuationTriggers) { + public Trigger getContinuationTrigger(List continuationTriggers) { return this; } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return endOfWindowReached(context); } - private boolean endOfWindowReached(Trigger.TriggerContext context) { + private boolean endOfWindowReached(Trigger.TriggerContext context) { return context.currentEventTime() != null && context.currentEventTime().isAfter(context.window().maxTimestamp()); } @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } + public void onFire(Trigger.TriggerContext context) throws Exception { } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java index b8abf0f8e89f..68b908377a61 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java @@ -28,12 +28,12 @@ /** * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires. */ -class OrFinallyTrigger extends Trigger { +class OrFinallyTrigger extends Trigger { private static final int ACTUAL = 0; private static final int UNTIL = 1; - @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) { + @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) { super(Arrays.asList(actual, until)); } @@ -45,14 +45,14 @@ public void onElement(OnElementContext c) throws Exception { @Override public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { subTrigger.invokeOnMerge(c); } updateFinishedState(c); } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger fires once either the trigger or the until trigger fires. Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window); Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window); @@ -60,25 +60,25 @@ public Instant getWatermarkThatGuaranteesFiring(W window) { } @Override - public Trigger getContinuationTrigger(List> continuationTriggers) { + public Trigger getContinuationTrigger(List continuationTriggers) { // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL // may not be a OnceTrigger. return Repeatedly.forever( - new OrFinallyTrigger( + new OrFinallyTrigger( continuationTriggers.get(ACTUAL), - (Trigger.OnceTrigger) continuationTriggers.get(UNTIL))); + (Trigger.OnceTrigger) continuationTriggers.get(UNTIL))); } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context) || context.trigger().subTrigger(UNTIL).invokeShouldFire(context); } @Override - public void onFire(Trigger.TriggerContext context) throws Exception { - ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL); - ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL); + public void onFire(Trigger.TriggerContext context) throws Exception { + ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL); + ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL); if (untilSubtrigger.invokeShouldFire(context)) { untilSubtrigger.invokeOnFire(context); @@ -94,7 +94,7 @@ public void onFire(Trigger.TriggerContext context) throws Exception { private void updateFinishedState(TriggerContext c) throws Exception { boolean anyStillFinished = false; - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished(); } c.trigger().setFinished(anyStillFinished); diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java index 9be0259dea14..988f0a419cd8 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java @@ -34,11 +34,8 @@ * *

    {@code Repeatedly.forever(someTrigger)} behaves like an infinite * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}. - * - * @param {@link BoundedWindow} subclass used to represent the windows used by this - * {@code Trigger} */ -public class Repeatedly extends Trigger { +public class Repeatedly extends Trigger { private static final int REPEATED = 0; @@ -50,11 +47,11 @@ public class Repeatedly extends Trigger { * * @param repeated the trigger to execute repeatedly. */ - public static Repeatedly forever(Trigger repeated) { - return new Repeatedly(repeated); + public static Repeatedly forever(Trigger repeated) { + return new Repeatedly(repeated); } - private Repeatedly(Trigger repeated) { + private Repeatedly(Trigger repeated) { super(Arrays.asList(repeated)); } @@ -70,18 +67,18 @@ public void onMerge(OnMergeContext c) throws Exception { } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger fires once the repeated trigger fires. return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window); } @Override - public Trigger getContinuationTrigger(List> continuationTriggers) { - return new Repeatedly(continuationTriggers.get(REPEATED)); + public Trigger getContinuationTrigger(List continuationTriggers) { + return new Repeatedly(continuationTriggers.get(REPEATED)); } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return getRepeated(context).invokeShouldFire(context); } @@ -95,7 +92,7 @@ public void onFire(TriggerContext context) throws Exception { } } - private ExecutableTrigger getRepeated(TriggerContext context) { + private ExecutableTrigger getRepeated(TriggerContext context) { return context.trigger().subTrigger(REPEATED); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java index 8b1a8bc90cd2..fde8ca4455c6 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java @@ -90,18 +90,15 @@ *

    Triggers should not build up any state internally since they may be recreated * between invocations of the callbacks. All important values should be persisted using * state before the callback returns. - * - * @param {@link BoundedWindow} subclass used to represent the windows used by this - * {@code Trigger} */ @Experimental(Experimental.Kind.TRIGGER) -public abstract class Trigger implements Serializable, TriggerBuilder { +public abstract class Trigger implements Serializable, TriggerBuilder { /** * Interface for accessing information about the trigger being executed and other triggers in the * same tree. */ - public interface TriggerInfo { + public interface TriggerInfo { /** * Returns true if the windowing strategy of the current {@code PCollection} is a merging @@ -114,12 +111,12 @@ public interface TriggerInfo { /** * Access the executable versions of the sub-triggers of the current trigger. */ - Iterable> subTriggers(); + Iterable subTriggers(); /** * Access the executable version of the specified sub-trigger. */ - ExecutableTrigger subTrigger(int subtriggerIndex); + ExecutableTrigger subTrigger(int subtriggerIndex); /** * Returns true if the current trigger is marked finished. @@ -139,12 +136,12 @@ public interface TriggerInfo { /** * Returns an iterable over the unfinished sub-triggers of the current trigger. */ - Iterable> unfinishedSubTriggers(); + Iterable unfinishedSubTriggers(); /** * Returns the first unfinished sub-trigger. */ - ExecutableTrigger firstUnfinishedSubTrigger(); + ExecutableTrigger firstUnfinishedSubTrigger(); /** * Clears all keyed state for triggers in the current sub-tree and unsets all the associated @@ -167,7 +164,7 @@ public interface TriggerInfo { * Interact with properties of the trigger being executed, with extensions to deal with the * merging windows. */ - public interface MergingTriggerInfo extends TriggerInfo { + public interface MergingTriggerInfo extends TriggerInfo { /** Return true if the trigger is finished in any window being merged. */ public abstract boolean finishedInAnyMergingWindow(); @@ -176,7 +173,7 @@ public interface MergingTriggerInfo extends TriggerInfo public abstract boolean finishedInAllMergingWindows(); /** Return the merging windows in which the trigger is finished. */ - public abstract Iterable getFinishedMergingWindows(); + public abstract Iterable getFinishedMergingWindows(); } /** @@ -188,16 +185,16 @@ public interface MergingTriggerInfo extends TriggerInfo public abstract class TriggerContext { /** Returns the interface for accessing trigger info. */ - public abstract TriggerInfo trigger(); + public abstract TriggerInfo trigger(); /** Returns the interface for accessing persistent state. */ public abstract StateAccessor state(); /** The window that the current context is executing in. */ - public abstract W window(); + public abstract BoundedWindow window(); /** Create a sub-context for the given sub-trigger. */ - public abstract TriggerContext forTrigger(ExecutableTrigger trigger); + public abstract TriggerContext forTrigger(ExecutableTrigger trigger); /** * Removes the timer set in this trigger context for the given {@link Instant} @@ -240,7 +237,7 @@ public abstract class OnElementContext extends TriggerContext { /** Create an {@code OnElementContext} for executing the given trigger. */ @Override - public abstract OnElementContext forTrigger(ExecutableTrigger trigger); + public abstract OnElementContext forTrigger(ExecutableTrigger trigger); } /** @@ -263,19 +260,19 @@ public abstract class OnMergeContext extends TriggerContext { /** Create an {@code OnMergeContext} for executing the given trigger. */ @Override - public abstract OnMergeContext forTrigger(ExecutableTrigger trigger); + public abstract OnMergeContext forTrigger(ExecutableTrigger trigger); @Override - public abstract MergingStateAccessor state(); + public abstract MergingStateAccessor state(); @Override - public abstract MergingTriggerInfo trigger(); + public abstract MergingTriggerInfo trigger(); } @Nullable - protected final List> subTriggers; + protected final List subTriggers; - protected Trigger(@Nullable List> subTriggers) { + protected Trigger(@Nullable List subTriggers) { this.subTriggers = subTriggers; } @@ -322,7 +319,7 @@ protected Trigger(@Nullable List> subTriggers) { */ public void prefetchOnElement(StateAccessor state) { if (subTriggers != null) { - for (Trigger trigger : subTriggers) { + for (Trigger trigger : subTriggers) { trigger.prefetchOnElement(state); } } @@ -332,9 +329,9 @@ public void prefetchOnElement(StateAccessor state) { * Called to allow the trigger to prefetch any state it will likely need to read from during * an {@link #onMerge} call. */ - public void prefetchOnMerge(MergingStateAccessor state) { + public void prefetchOnMerge(MergingStateAccessor state) { if (subTriggers != null) { - for (Trigger trigger : subTriggers) { + for (Trigger trigger : subTriggers) { trigger.prefetchOnMerge(state); } } @@ -346,7 +343,7 @@ public void prefetchOnMerge(MergingStateAccessor state) { */ public void prefetchShouldFire(StateAccessor state) { if (subTriggers != null) { - for (Trigger trigger : subTriggers) { + for (Trigger trigger : subTriggers) { trigger.prefetchShouldFire(state); } } @@ -358,7 +355,7 @@ public void prefetchShouldFire(StateAccessor state) { */ public void prefetchOnFire(StateAccessor state) { if (subTriggers != null) { - for (Trigger trigger : subTriggers) { + for (Trigger trigger : subTriggers) { trigger.prefetchOnFire(state); } } @@ -373,13 +370,13 @@ public void prefetchOnFire(StateAccessor state) { */ public void clear(TriggerContext c) throws Exception { if (subTriggers != null) { - for (ExecutableTrigger trigger : c.trigger().subTriggers()) { + for (ExecutableTrigger trigger : c.trigger().subTriggers()) { trigger.invokeClear(c); } } } - public Iterable> subTriggers() { + public Iterable subTriggers() { return subTriggers; } @@ -390,13 +387,13 @@ public Iterable> subTriggers() { * speculative results. Triggers that fire once (or multiple times) should * continue firing once (or multiple times). */ - public Trigger getContinuationTrigger() { + public Trigger getContinuationTrigger() { if (subTriggers == null) { return getContinuationTrigger(null); } - List> subTriggerContinuations = new ArrayList<>(); - for (Trigger subTrigger : subTriggers) { + List subTriggerContinuations = new ArrayList<>(); + for (Trigger subTrigger : subTriggers) { subTriggerContinuations.add(subTrigger.getContinuationTrigger()); } return getContinuationTrigger(subTriggerContinuations); @@ -406,7 +403,7 @@ public Trigger getContinuationTrigger() { * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this * is provided the continuation trigger of each of the sub-triggers. */ - protected abstract Trigger getContinuationTrigger(List> continuationTriggers); + protected abstract Trigger getContinuationTrigger(List continuationTriggers); /** * Returns a bound in watermark time by which this trigger would have fired at least once @@ -419,12 +416,12 @@ public Trigger getContinuationTrigger() { *

    This estimate is used to determine that there are no elements in a side-input window, which * causes the default value to be used instead. */ - public abstract Instant getWatermarkThatGuaranteesFiring(W window); + public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window); /** * Returns whether this performs the same triggering as the given {@code Trigger}. */ - public boolean isCompatible(Trigger other) { + public boolean isCompatible(Trigger other) { if (!getClass().equals(other.getClass())) { return false; } @@ -467,8 +464,7 @@ public boolean equals(Object obj) { if (!(obj instanceof Trigger)) { return false; } - @SuppressWarnings("unchecked") - Trigger that = (Trigger) obj; + Trigger that = (Trigger) obj; return Objects.equals(getClass(), that.getClass()) && Objects.equals(subTriggers, that.subTriggers); } @@ -497,34 +493,31 @@ public int hashCode() { *

    Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same * as {@code AfterFirst.of(t1, t2)}. */ - public Trigger orFinally(OnceTrigger until) { - return new OrFinallyTrigger(this, until); + public Trigger orFinally(OnceTrigger until) { + return new OrFinallyTrigger(this, until); } @Override - public Trigger buildTrigger() { + public Trigger buildTrigger() { return this; } /** * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather * than the general {@link Trigger} class to indicate that behavior. - * - * @param {@link BoundedWindow} subclass used to represent the windows used by this - * {@code AtMostOnceTrigger} */ - public abstract static class OnceTrigger extends Trigger { - protected OnceTrigger(List> subTriggers) { + public abstract static class OnceTrigger extends Trigger { + protected OnceTrigger(List subTriggers) { super(subTriggers); } @Override - public final OnceTrigger getContinuationTrigger() { - Trigger continuation = super.getContinuationTrigger(); + public final OnceTrigger getContinuationTrigger() { + Trigger continuation = super.getContinuationTrigger(); if (!(continuation instanceof OnceTrigger)) { throw new IllegalStateException("Continuation of a OnceTrigger must be a OnceTrigger"); } - return (OnceTrigger) continuation; + return (OnceTrigger) continuation; } /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java index dcc042c83d8b..017fd87cda21 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerBuilder.java @@ -22,10 +22,8 @@ * *

    This includes {@code Trigger}s (which can return themselves) and any "enhanced" syntax for * constructing a trigger. - * - * @param The type of windows the built trigger will operate on. */ -public interface TriggerBuilder { +public interface TriggerBuilder { /** Return the {@code Trigger} built by this builder. */ - Trigger buildTrigger(); + Trigger buildTrigger(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java index 20b3ed5895e4..aef61b2e8761 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java @@ -194,7 +194,7 @@ public static Bound into(WindowFn fn) { * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. */ @Experimental(Kind.TRIGGER) - public static Bound triggering(TriggerBuilder trigger) { + public static Bound triggering(TriggerBuilder trigger) { return new Unbound().triggering(trigger); } @@ -289,7 +289,7 @@ public Bound into(WindowFn fn) { * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. */ @Experimental(Kind.TRIGGER) - public Bound triggering(TriggerBuilder trigger) { + public Bound triggering(TriggerBuilder trigger) { return new Bound(name).triggering(trigger); } @@ -361,16 +361,20 @@ public static class Bound extends PTransform, PCollection> @Nullable private final WindowFn windowFn; - @Nullable private final Trigger trigger; + @Nullable private final Trigger trigger; @Nullable private final AccumulationMode mode; @Nullable private final Duration allowedLateness; @Nullable private final ClosingBehavior closingBehavior; @Nullable private final OutputTimeFn outputTimeFn; - private Bound(String name, - @Nullable WindowFn windowFn, @Nullable Trigger trigger, - @Nullable AccumulationMode mode, @Nullable Duration allowedLateness, - ClosingBehavior behavior, @Nullable OutputTimeFn outputTimeFn) { + private Bound( + String name, + @Nullable WindowFn windowFn, + @Nullable Trigger trigger, + @Nullable AccumulationMode mode, + @Nullable Duration allowedLateness, + ClosingBehavior behavior, + @Nullable OutputTimeFn outputTimeFn) { super(name); this.windowFn = windowFn; this.trigger = trigger; @@ -428,7 +432,7 @@ public Bound named(String name) { * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. */ @Experimental(Kind.TRIGGER) - public Bound triggering(TriggerBuilder trigger) { + public Bound triggering(TriggerBuilder trigger) { return new Bound( name, windowFn, diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java index 25a630a88da8..71c8237eb75b 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ExecutableTrigger.java @@ -30,42 +30,40 @@ * A wrapper around a trigger used during execution. While an actual trigger may appear multiple * times (both in the same trigger expression and in other trigger expressions), the * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence). - * - * @param {@link BoundedWindow} subclass used to represent the windows used. */ -public class ExecutableTrigger implements Serializable { +public class ExecutableTrigger implements Serializable { /** Store the index assigned to this trigger. */ private final int triggerIndex; private final int firstIndexAfterSubtree; - private final List> subTriggers = new ArrayList<>(); - private final Trigger trigger; + private final List subTriggers = new ArrayList<>(); + private final Trigger trigger; - public static ExecutableTrigger create(Trigger trigger) { + public static ExecutableTrigger create(Trigger trigger) { return create(trigger, 0); } - private static ExecutableTrigger create( - Trigger trigger, int nextUnusedIndex) { + private static ExecutableTrigger create( + Trigger trigger, int nextUnusedIndex) { if (trigger instanceof OnceTrigger) { - return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex); + return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex); } else { - return new ExecutableTrigger(trigger, nextUnusedIndex); + return new ExecutableTrigger(trigger, nextUnusedIndex); } } - public static ExecutableTrigger createForOnceTrigger( - OnceTrigger trigger, int nextUnusedIndex) { - return new ExecutableOnceTrigger(trigger, nextUnusedIndex); + public static ExecutableTrigger createForOnceTrigger( + OnceTrigger trigger, int nextUnusedIndex) { + return new ExecutableOnceTrigger(trigger, nextUnusedIndex); } - private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) { + private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) { this.trigger = Preconditions.checkNotNull(trigger, "trigger must not be null"); this.triggerIndex = nextUnusedIndex++; if (trigger.subTriggers() != null) { - for (Trigger subTrigger : trigger.subTriggers()) { - ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex); + for (Trigger subTrigger : trigger.subTriggers()) { + ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex); subTriggers.add(subExecutable); nextUnusedIndex = subExecutable.firstIndexAfterSubtree; } @@ -73,7 +71,7 @@ private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) { firstIndexAfterSubtree = nextUnusedIndex; } - public List> subTriggers() { + public List subTriggers() { return subTriggers; } @@ -85,7 +83,7 @@ public String toString() { /** * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}. */ - public Trigger getSpec() { + public Trigger getSpec() { return trigger; } @@ -97,16 +95,16 @@ public final int getFirstIndexAfterSubtree() { return firstIndexAfterSubtree; } - public boolean isCompatible(ExecutableTrigger other) { + public boolean isCompatible(ExecutableTrigger other) { return trigger.isCompatible(other.trigger); } - public ExecutableTrigger getSubTriggerContaining(int index) { + public ExecutableTrigger getSubTriggerContaining(int index) { Preconditions.checkNotNull(subTriggers); Preconditions.checkState(index > triggerIndex && index < firstIndexAfterSubtree, "Cannot find sub-trigger containing index not in this tree."); - ExecutableTrigger previous = null; - for (ExecutableTrigger subTrigger : subTriggers) { + ExecutableTrigger previous = null; + for (ExecutableTrigger subTrigger : subTriggers) { if (index < subTrigger.triggerIndex) { return previous; } @@ -119,7 +117,7 @@ public ExecutableTrigger getSubTriggerContaining(int index) { * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are * properly updated if the trigger finishes. */ - public void invokeOnElement(Trigger.OnElementContext c) throws Exception { + public void invokeOnElement(Trigger.OnElementContext c) throws Exception { trigger.onElement(c.forTrigger(this)); } @@ -127,23 +125,23 @@ public void invokeOnElement(Trigger.OnElementContext c) throws Exception { * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly * updated. */ - public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception { - Trigger.OnMergeContext subContext = c.forTrigger(this); + public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception { + Trigger.OnMergeContext subContext = c.forTrigger(this); trigger.onMerge(subContext); } - public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception { + public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception { return trigger.shouldFire(c.forTrigger(this)); } - public void invokeOnFire(Trigger.TriggerContext c) throws Exception { + public void invokeOnFire(Trigger.TriggerContext c) throws Exception { trigger.onFire(c.forTrigger(this)); } /** * Invoke clear for the current this trigger. */ - public void invokeClear(Trigger.TriggerContext c) throws Exception { + public void invokeClear(Trigger.TriggerContext c) throws Exception { trigger.clear(c.forTrigger(this)); } @@ -151,9 +149,9 @@ public void invokeClear(Trigger.TriggerContext c) throws Exception { * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH * and never just FIRE. */ - private static class ExecutableOnceTrigger extends ExecutableTrigger { + private static class ExecutableOnceTrigger extends ExecutableTrigger { - public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) { + public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) { super(trigger, nextUnusedIndex); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java index 76b54e53f27b..e0f14de4be68 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggers.java @@ -25,17 +25,17 @@ public interface FinishedTriggers { /** * Returns {@code true} if the trigger is finished. */ - public boolean isFinished(ExecutableTrigger trigger); + public boolean isFinished(ExecutableTrigger trigger); /** * Sets the fact that the trigger is finished. */ - public void setFinished(ExecutableTrigger trigger, boolean value); + public void setFinished(ExecutableTrigger trigger, boolean value); /** * Sets the trigger and all of its subtriggers to unfinished. */ - public void clearRecursively(ExecutableTrigger trigger); + public void clearRecursively(ExecutableTrigger trigger); /** * Create an independent copy of this mutable {@link FinishedTriggers}. diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java index f31e64d5e6f1..69e1b84822a7 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersBitSet.java @@ -46,17 +46,17 @@ public BitSet getBitSet() { } @Override - public boolean isFinished(ExecutableTrigger trigger) { + public boolean isFinished(ExecutableTrigger trigger) { return bitSet.get(trigger.getTriggerIndex()); } @Override - public void setFinished(ExecutableTrigger trigger, boolean value) { + public void setFinished(ExecutableTrigger trigger, boolean value) { bitSet.set(trigger.getTriggerIndex(), value); } @Override - public void clearRecursively(ExecutableTrigger trigger) { + public void clearRecursively(ExecutableTrigger trigger) { bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree()); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSet.java index d0d8329cb0ab..c6ec7a169f48 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSet.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSet.java @@ -26,30 +26,30 @@ */ public class FinishedTriggersSet implements FinishedTriggers { - private final Set> finishedTriggers; + private final Set finishedTriggers; - private FinishedTriggersSet(Set> finishedTriggers) { + private FinishedTriggersSet(Set finishedTriggers) { this.finishedTriggers = finishedTriggers; } - public static FinishedTriggersSet fromSet(Set> finishedTriggers) { + public static FinishedTriggersSet fromSet(Set finishedTriggers) { return new FinishedTriggersSet(finishedTriggers); } /** * Returns a mutable {@link Set} of the underlying triggers that are finished. */ - public Set> getFinishedTriggers() { + public Set getFinishedTriggers() { return finishedTriggers; } @Override - public boolean isFinished(ExecutableTrigger trigger) { + public boolean isFinished(ExecutableTrigger trigger) { return finishedTriggers.contains(trigger); } @Override - public void setFinished(ExecutableTrigger trigger, boolean value) { + public void setFinished(ExecutableTrigger trigger, boolean value) { if (value) { finishedTriggers.add(trigger); } else { @@ -58,9 +58,9 @@ public void setFinished(ExecutableTrigger trigger, boolean value) { } @Override - public void clearRecursively(ExecutableTrigger trigger) { + public void clearRecursively(ExecutableTrigger trigger) { finishedTriggers.remove(trigger); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { clearRecursively(subTrigger); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java index 98a9d945c761..d7725661df06 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReshuffleTrigger.java @@ -30,34 +30,34 @@ * * @param The kind of window that is being reshuffled. */ -public class ReshuffleTrigger extends Trigger { +public class ReshuffleTrigger extends Trigger { ReshuffleTrigger() { super(null); } @Override - public void onElement(Trigger.OnElementContext c) { } + public void onElement(Trigger.OnElementContext c) { } @Override - public void onMerge(Trigger.OnMergeContext c) { } + public void onMerge(Trigger.OnMergeContext c) { } @Override - protected Trigger getContinuationTrigger(List> continuationTriggers) { + protected Trigger getContinuationTrigger(List continuationTriggers) { return this; } @Override - public Instant getWatermarkThatGuaranteesFiring(W window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { throw new UnsupportedOperationException( "ReshuffleTrigger should not be used outside of Reshuffle"); } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return true; } @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } + public void onFire(Trigger.TriggerContext context) throws Exception { } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java index 2408e9e02b8e..5e2dc5ed9c5e 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java @@ -65,40 +65,40 @@ public TriggerContextFactory(WindowingStrategy windowingStrategy, this.windowCoder = windowingStrategy.getWindowFn().windowCoder(); } - public Trigger.TriggerContext base(W window, Timers timers, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { + public Trigger.TriggerContext base(W window, Timers timers, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { return new TriggerContextImpl(window, timers, rootTrigger, finishedSet); } - public Trigger.OnElementContext createOnElementContext( + public Trigger.OnElementContext createOnElementContext( W window, Timers timers, Instant elementTimestamp, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp); } - public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet, + public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers, + ExecutableTrigger rootTrigger, FinishedTriggers finishedSet, Map finishedSets) { return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets); } - public StateAccessor createStateAccessor(W window, ExecutableTrigger trigger) { + public StateAccessor createStateAccessor(W window, ExecutableTrigger trigger) { return new StateAccessorImpl(window, trigger); } public MergingStateAccessor createMergingStateAccessor( - W mergeResult, Collection mergingWindows, ExecutableTrigger trigger) { + W mergeResult, Collection mergingWindows, ExecutableTrigger trigger) { return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult); } - private class TriggerInfoImpl implements Trigger.TriggerInfo { + private class TriggerInfoImpl implements Trigger.TriggerInfo { - protected final ExecutableTrigger trigger; + protected final ExecutableTrigger trigger; protected final FinishedTriggers finishedSet; - private final Trigger.TriggerContext context; + private final Trigger.TriggerContext context; - public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet, - Trigger.TriggerContext context) { + public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet, + Trigger.TriggerContext context) { this.trigger = trigger; this.finishedSet = finishedSet; this.context = context; @@ -110,12 +110,12 @@ public boolean isMerging() { } @Override - public Iterable> subTriggers() { + public Iterable subTriggers() { return trigger.subTriggers(); } @Override - public ExecutableTrigger subTrigger(int subtriggerIndex) { + public ExecutableTrigger subTrigger(int subtriggerIndex) { return trigger.subTriggers().get(subtriggerIndex); } @@ -135,20 +135,20 @@ public boolean areAllSubtriggersFinished() { } @Override - public Iterable> unfinishedSubTriggers() { + public Iterable unfinishedSubTriggers() { return FluentIterable .from(trigger.subTriggers()) - .filter(new Predicate>() { + .filter(new Predicate() { @Override - public boolean apply(ExecutableTrigger trigger) { + public boolean apply(ExecutableTrigger trigger) { return !finishedSet.isFinished(trigger); } }); } @Override - public ExecutableTrigger firstUnfinishedSubTrigger() { - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + public ExecutableTrigger firstUnfinishedSubTrigger() { + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { if (!finishedSet.isFinished(subTrigger)) { return subTrigger; } @@ -217,14 +217,14 @@ public Instant currentEventTime() { } private class MergingTriggerInfoImpl - extends TriggerInfoImpl implements Trigger.MergingTriggerInfo { + extends TriggerInfoImpl implements Trigger.MergingTriggerInfo { private final Map finishedSets; public MergingTriggerInfoImpl( - ExecutableTrigger trigger, + ExecutableTrigger trigger, FinishedTriggers finishedSet, - Trigger.TriggerContext context, + Trigger.TriggerContext context, Map finishedSets) { super(trigger, finishedSet, context); this.finishedSets = finishedSets; @@ -251,7 +251,7 @@ public boolean finishedInAllMergingWindows() { } @Override - public Iterable getFinishedMergingWindows() { + public Iterable getFinishedMergingWindows() { return Maps.filterValues(finishedSets, new Predicate() { @Override public boolean apply(FinishedTriggers finishedSet) { @@ -267,7 +267,7 @@ private class StateAccessorImpl implements StateAccessor { public StateAccessorImpl( W window, - ExecutableTrigger trigger) { + ExecutableTrigger trigger) { this.triggerIndex = trigger.getTriggerIndex(); this.windowNamespace = namespaceFor(window); } @@ -286,7 +286,7 @@ private class MergingStateAccessorImpl extends StateAccessorImpl implements MergingStateAccessor { private final Collection activeToBeMerged; - public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection activeToBeMerged, + public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection activeToBeMerged, W mergeResult) { super(mergeResult, trigger); this.activeToBeMerged = activeToBeMerged; @@ -310,7 +310,7 @@ public Map accessInEachMergingWindow( } } - private class TriggerContextImpl extends Trigger.TriggerContext { + private class TriggerContextImpl extends Trigger.TriggerContext { private final W window; private final StateAccessorImpl state; @@ -320,7 +320,7 @@ private class TriggerContextImpl extends Trigger.TriggerContext { private TriggerContextImpl( W window, Timers timers, - ExecutableTrigger trigger, + ExecutableTrigger trigger, FinishedTriggers finishedSet) { trigger.getSpec().super(); this.window = window; @@ -330,17 +330,17 @@ private TriggerContextImpl( } @Override - public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) { + public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) { return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet); } @Override - public TriggerInfo trigger() { + public TriggerInfo trigger() { return triggerInfo; } @Override - public StateAccessor state() { + public StateAccessor state() { return state; } @@ -372,7 +372,7 @@ public Instant currentEventTime() { } } - private class OnElementContextImpl extends Trigger.OnElementContext { + private class OnElementContextImpl extends Trigger.OnElementContext { private final W window; private final StateAccessorImpl state; @@ -383,7 +383,7 @@ private class OnElementContextImpl extends Trigger.OnElementContext { private OnElementContextImpl( W window, Timers timers, - ExecutableTrigger trigger, + ExecutableTrigger trigger, FinishedTriggers finishedSet, Instant eventTimestamp) { trigger.getSpec().super(); @@ -401,18 +401,18 @@ public Instant eventTimestamp() { } @Override - public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) { + public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) { return new OnElementContextImpl( window, timers, trigger, triggerInfo.finishedSet, eventTimestamp); } @Override - public TriggerInfo trigger() { + public TriggerInfo trigger() { return triggerInfo; } @Override - public StateAccessor state() { + public StateAccessor state() { return state; } @@ -450,7 +450,7 @@ public Instant currentEventTime() { } } - private class OnMergeContextImpl extends Trigger.OnMergeContext { + private class OnMergeContextImpl extends Trigger.OnMergeContext { private final MergingStateAccessor state; private final W window; private final Collection mergingWindows; @@ -460,7 +460,7 @@ private class OnMergeContextImpl extends Trigger.OnMergeContext { private OnMergeContextImpl( W window, Timers timers, - ExecutableTrigger trigger, + ExecutableTrigger trigger, FinishedTriggers finishedSet, Map finishedSets) { trigger.getSpec().super(); @@ -472,7 +472,7 @@ private OnMergeContextImpl( } @Override - public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) { + public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) { return new OnMergeContextImpl( window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets); } @@ -483,7 +483,7 @@ public MergingStateAccessor state() { } @Override - public MergingTriggerInfo trigger() { + public MergingTriggerInfo trigger() { return triggerInfo; } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index aa7e1010e50d..24e92f800d86 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -61,10 +61,10 @@ public class TriggerRunner { static final StateTag> FINISHED_BITS_TAG = StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); - private final ExecutableTrigger rootTrigger; + private final ExecutableTrigger rootTrigger; private final TriggerContextFactory contextFactory; - public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory contextFactory) { + public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory contextFactory) { Preconditions.checkState(rootTrigger.getTriggerIndex() == 0); this.rootTrigger = rootTrigger; this.contextFactory = contextFactory; @@ -129,7 +129,7 @@ public void processValue(W window, Instant timestamp, Timers timers, StateAccess // Clone so that we can detect changes and so that changes here don't pollute merging. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext( + Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext( window, timers, timestamp, rootTrigger, finishedSet); rootTrigger.invokeOnElement(triggerContext); persistFinishedSet(state, finishedSet); @@ -165,7 +165,7 @@ public void onMerge(W window, Timers timers, MergingStateAccessor state) t } ImmutableMap mergingFinishedSets = builder.build(); - Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext( + Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext( window, timers, rootTrigger, finishedSet, mergingFinishedSets); // Run the merge from the trigger @@ -176,7 +176,7 @@ public void onMerge(W window, Timers timers, MergingStateAccessor state) t public boolean shouldFire(W window, Timers timers, StateAccessor state) throws Exception { FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.TriggerContext context = contextFactory.base(window, timers, + Trigger.TriggerContext context = contextFactory.base(window, timers, rootTrigger, finishedSet); return rootTrigger.invokeShouldFire(context); } @@ -186,7 +186,7 @@ public void onFire(W window, Timers timers, StateAccessor state) throws Excep // However it is too expensive to assert. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.TriggerContext context = contextFactory.base(window, timers, + Trigger.TriggerContext context = contextFactory.base(window, timers, rootTrigger, finishedSet); rootTrigger.invokeOnFire(context); persistFinishedSet(state, finishedSet); diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WindowingStrategy.java index 8b78ce415561..6068656da438 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WindowingStrategy.java @@ -57,7 +57,7 @@ public enum AccumulationMode { private final WindowFn windowFn; private final OutputTimeFn outputTimeFn; - private final ExecutableTrigger trigger; + private final ExecutableTrigger trigger; private final AccumulationMode mode; private final Duration allowedLateness; private final ClosingBehavior closingBehavior; @@ -68,7 +68,7 @@ public enum AccumulationMode { private WindowingStrategy( WindowFn windowFn, - ExecutableTrigger trigger, boolean triggerSpecified, + ExecutableTrigger trigger, boolean triggerSpecified, AccumulationMode mode, boolean modeSpecified, Duration allowedLateness, boolean allowedLatenessSpecified, OutputTimeFn outputTimeFn, boolean outputTimeFnSpecified, @@ -106,7 +106,7 @@ public WindowFn getWindowFn() { return windowFn; } - public ExecutableTrigger getTrigger() { + public ExecutableTrigger getTrigger() { return trigger; } @@ -146,12 +146,10 @@ public boolean isOutputTimeFnSpecified() { * Returns a {@link WindowingStrategy} identical to {@code this} but with the trigger set to * {@code wildcardTrigger}. */ - public WindowingStrategy withTrigger(Trigger wildcardTrigger) { - @SuppressWarnings("unchecked") - Trigger typedTrigger = (Trigger) wildcardTrigger; + public WindowingStrategy withTrigger(Trigger trigger) { return new WindowingStrategy( windowFn, - ExecutableTrigger.create(typedTrigger), true, + ExecutableTrigger.create(trigger), true, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, outputTimeFn, outputTimeFnSpecified, diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAllTest.java index 5876d94422a8..049944873a89 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAllTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAllTest.java @@ -141,9 +141,9 @@ public void testFireDeadline() throws Exception { @Test public void testContinuation() throws Exception { - OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); - Trigger afterAll = AfterAll.of(trigger1, trigger2); + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterAll = AfterAll.of(trigger1, trigger2); assertEquals( AfterAll.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), afterAll.getContinuationTrigger()); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEachTest.java index 185ae0d22b72..5dde528758f1 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEachTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEachTest.java @@ -112,9 +112,9 @@ public void testFireDeadline() throws Exception { @Test public void testContinuation() throws Exception { - OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); - Trigger afterEach = AfterEach.inOrder(trigger1, trigger2); + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterEach = AfterEach.inOrder(trigger1, trigger2); assertEquals( Repeatedly.forever(AfterFirst.of( trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())), diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirstTest.java index bc6a2b85bd03..248310713283 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirstTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirstTest.java @@ -42,11 +42,11 @@ @RunWith(JUnit4.class) public class AfterFirstTest { - @Mock private OnceTrigger mockTrigger1; - @Mock private OnceTrigger mockTrigger2; + @Mock private OnceTrigger mockTrigger1; + @Mock private OnceTrigger mockTrigger2; private SimpleTriggerTester tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito..TriggerContext>any(); + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.any(); } @Before @@ -166,9 +166,9 @@ public void testFireDeadline() throws Exception { @Test public void testContinuation() throws Exception { - OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); - Trigger afterFirst = AfterFirst.of(trigger1, trigger2); + OnceTrigger trigger1 = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger trigger2 = AfterWatermark.pastEndOfWindow(); + Trigger afterFirst = AfterFirst.of(trigger1, trigger2); assertEquals( AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), afterFirst.getContinuationTrigger()); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTimeTest.java index 75677b5a77c4..04f37a1467c5 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -137,10 +137,10 @@ public void testFireDeadline() throws Exception { @Test public void testContinuation() throws Exception { - OnceTrigger firstElementPlus1 = + OnceTrigger firstElementPlus1 = AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)); assertEquals( - new AfterSynchronizedProcessingTime<>(), + new AfterSynchronizedProcessingTime(), firstElementPlus1.getContinuationTrigger()); } @@ -149,9 +149,9 @@ public void testContinuation() throws Exception { */ @Test public void testCompatibilityIdentical() throws Exception { - Trigger t1 = AfterProcessingTime.pastFirstElementInPane() + Trigger t1 = AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1L)); - Trigger t2 = AfterProcessingTime.pastFirstElementInPane() + Trigger t2 = AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1L)); assertTrue(t1.isCompatible(t2)); } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java index c9d09f73498c..047f0ca75c65 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java @@ -36,8 +36,7 @@ @RunWith(JUnit4.class) public class AfterSynchronizedProcessingTimeTest { - private Trigger underTest = - new AfterSynchronizedProcessingTime(); + private Trigger underTest = new AfterSynchronizedProcessingTime(); @Test public void testAfterProcessingTimeWithFixedWindows() throws Exception { diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermarkTest.java index ca909f96614a..7417953ed06e 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermarkTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermarkTest.java @@ -42,15 +42,15 @@ @RunWith(JUnit4.class) public class AfterWatermarkTest { - @Mock private OnceTrigger mockEarly; - @Mock private OnceTrigger mockLate; + @Mock private OnceTrigger mockEarly; + @Mock private OnceTrigger mockLate; private SimpleTriggerTester tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito..TriggerContext>any(); + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.any(); } - private static Trigger.OnElementContext anyElementContext() { - return Mockito..OnElementContext>any(); + private static Trigger.OnElementContext anyElementContext() { + return Mockito.any(); } private void injectElements(int... elements) throws Exception { @@ -66,7 +66,7 @@ public void setUp() { MockitoAnnotations.initMocks(this); } - public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window) + public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window) throws Exception { // Don't fire due to mock saying no diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTriggerTest.java index 86358739302f..22dde66584fc 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTriggerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTriggerTest.java @@ -46,7 +46,7 @@ public class OrFinallyTriggerTest { @Test public void testActualFiresAndFinishes() throws Exception { tester = TriggerTester.forTrigger( - new OrFinallyTrigger<>( + new OrFinallyTrigger( AfterPane.elementCountAtLeast(2), AfterPane.elementCountAtLeast(100)), FixedWindows.of(Duration.millis(100))); @@ -73,7 +73,7 @@ public void testActualFiresAndFinishes() throws Exception { @Test public void testActualFiresOnly() throws Exception { tester = TriggerTester.forTrigger( - new OrFinallyTrigger<>( + new OrFinallyTrigger( Repeatedly.forever(AfterPane.elementCountAtLeast(2)), AfterPane.elementCountAtLeast(100)), FixedWindows.of(Duration.millis(100))); @@ -143,7 +143,7 @@ public void testShouldFireAfterMerge() throws Exception { @Test public void testActualFiresButUntilFinishes() throws Exception { tester = TriggerTester.forTrigger( - new OrFinallyTrigger( + new OrFinallyTrigger( Repeatedly.forever(AfterPane.elementCountAtLeast(2)), AfterPane.elementCountAtLeast(3)), FixedWindows.of(Duration.millis(10))); @@ -194,10 +194,10 @@ public void testFireDeadline() throws Exception { @Test public void testContinuation() throws Exception { - OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane(); - OnceTrigger triggerB = AfterWatermark.pastEndOfWindow(); - Trigger aOrFinallyB = triggerA.orFinally(triggerB); - Trigger bOrFinallyA = triggerB.orFinally(triggerA); + OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane(); + OnceTrigger triggerB = AfterWatermark.pastEndOfWindow(); + Trigger aOrFinallyB = triggerA.orFinally(triggerB); + Trigger bOrFinallyA = triggerB.orFinally(triggerA); assertEquals( Repeatedly.forever( triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())), diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java index 99907b20ebdf..c98090e15a07 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java @@ -43,10 +43,10 @@ @RunWith(JUnit4.class) public class RepeatedlyTest { - @Mock private Trigger mockTrigger; + @Mock private Trigger mockTrigger; private SimpleTriggerTester tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito..TriggerContext>any(); + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.any(); } public void setUp(WindowFn windowFn) throws Exception { @@ -61,7 +61,7 @@ public void setUp(WindowFn windowFn) throws Exception { public void testOnElement() throws Exception { setUp(FixedWindows.of(Duration.millis(10))); tester.injectElements(37); - verify(mockTrigger).onElement(Mockito..OnElementContext>any()); + verify(mockTrigger).onElement(Mockito.any()); } /** @@ -74,7 +74,7 @@ public void testShouldFire() throws Exception { when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); - when(mockTrigger.shouldFire(Mockito..TriggerContext>any())) + when(mockTrigger.shouldFire(Mockito.any())) .thenReturn(false); assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); } @@ -98,8 +98,8 @@ public void testFireDeadline() throws Exception { @Test public void testContinuation() throws Exception { - Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); - Trigger repeatedly = Repeatedly.forever(trigger); + Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); + Trigger repeatedly = Repeatedly.forever(trigger); assertEquals( Repeatedly.forever(trigger.getContinuationTrigger()), repeatedly.getContinuationTrigger()); assertEquals( diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerTest.java index 50b30ee928bc..8bc0826a42f2 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/TriggerTest.java @@ -45,75 +45,75 @@ public void testTriggerToString() throws Exception { @Test public void testIsCompatible() throws Exception { assertTrue(new Trigger1(null).isCompatible(new Trigger1(null))); - assertTrue(new Trigger1(Arrays.>asList(new Trigger2(null))) - .isCompatible(new Trigger1(Arrays.>asList(new Trigger2(null))))); + assertTrue(new Trigger1(Arrays.asList(new Trigger2(null))) + .isCompatible(new Trigger1(Arrays.asList(new Trigger2(null))))); assertFalse(new Trigger1(null).isCompatible(new Trigger2(null))); - assertFalse(new Trigger1(Arrays.>asList(new Trigger1(null))) - .isCompatible(new Trigger1(Arrays.>asList(new Trigger2(null))))); + assertFalse(new Trigger1(Arrays.asList(new Trigger1(null))) + .isCompatible(new Trigger1(Arrays.asList(new Trigger2(null))))); } - private static class Trigger1 extends Trigger { + private static class Trigger1 extends Trigger { - private Trigger1(List> subTriggers) { + private Trigger1(List subTriggers) { super(subTriggers); } @Override - public void onElement(Trigger.OnElementContext c) { } + public void onElement(Trigger.OnElementContext c) { } @Override - public void onMerge(Trigger.OnMergeContext c) { } + public void onMerge(Trigger.OnMergeContext c) { } @Override - protected Trigger getContinuationTrigger( - List> continuationTriggers) { + protected Trigger getContinuationTrigger( + List continuationTriggers) { return null; } @Override - public Instant getWatermarkThatGuaranteesFiring(IntervalWindow window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return null; } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return false; } @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } + public void onFire(Trigger.TriggerContext context) throws Exception { } } - private static class Trigger2 extends Trigger { + private static class Trigger2 extends Trigger { - private Trigger2(List> subTriggers) { + private Trigger2(List subTriggers) { super(subTriggers); } @Override - public void onElement(Trigger.OnElementContext c) { } + public void onElement(Trigger.OnElementContext c) { } @Override - public void onMerge(Trigger.OnMergeContext c) { } + public void onMerge(Trigger.OnMergeContext c) { } @Override - protected Trigger getContinuationTrigger( - List> continuationTriggers) { + protected Trigger getContinuationTrigger( + List continuationTriggers) { return null; } @Override - public Instant getWatermarkThatGuaranteesFiring(IntervalWindow window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return null; } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { return false; } @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } + public void onFire(Trigger.TriggerContext context) throws Exception { } } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowTest.java index c4eda9930551..39a33ccdedc8 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/WindowTest.java @@ -74,7 +74,7 @@ public void testWindowIntoSetWindowfn() { @Test public void testWindowIntoTriggersAndAccumulating() { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); - Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); + Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); WindowingStrategy strategy = TestPipeline.create() .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply(Window.into(fixed10) @@ -91,7 +91,7 @@ public void testWindowIntoTriggersAndAccumulating() { @Test public void testWindowPropagatesEachPart() { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); - Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); + Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); WindowingStrategy strategy = TestPipeline.create() .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) .apply("Mode", Window.accumulatingFiredPanes()) @@ -149,7 +149,7 @@ public void testNonDeterministicWindowCoder() throws NonDeterministicException { @Test public void testMissingMode() { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); - Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); + Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("requires that the accumulation mode"); @@ -163,7 +163,7 @@ public void testMissingMode() { @Test public void testMissingLateness() { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); - Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); + Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("requires that the allowed lateness"); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ExecutableTriggerTest.java index 16d2640f5718..c975e0807603 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ExecutableTriggerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ExecutableTriggerTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertSame; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; import org.joda.time.Instant; @@ -41,7 +40,7 @@ public class ExecutableTriggerTest { @Test public void testIndexAssignmentLeaf() throws Exception { StubTrigger t1 = new StubTrigger(); - ExecutableTrigger executable = ExecutableTrigger.create(t1); + ExecutableTrigger executable = ExecutableTrigger.create(t1); assertEquals(0, executable.getTriggerIndex()); } @@ -51,7 +50,7 @@ public void testIndexAssignmentOneLevel() throws Exception { StubTrigger t2 = new StubTrigger(); StubTrigger t = new StubTrigger(t1, t2); - ExecutableTrigger executable = ExecutableTrigger.create(t); + ExecutableTrigger executable = ExecutableTrigger.create(t); assertEquals(0, executable.getTriggerIndex()); assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); @@ -72,7 +71,7 @@ public void testIndexAssignmentTwoLevel() throws Exception { StubTrigger t2 = new StubTrigger(t21, t22); StubTrigger t = new StubTrigger(t1, t2); - ExecutableTrigger executable = ExecutableTrigger.create(t); + ExecutableTrigger executable = ExecutableTrigger.create(t); assertEquals(0, executable.getTriggerIndex()); assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); @@ -87,10 +86,10 @@ public void testIndexAssignmentTwoLevel() throws Exception { assertSame(t2, executable.getSubTriggerContaining(7).getSpec()); } - private static class StubTrigger extends Trigger { + private static class StubTrigger extends Trigger { @SafeVarargs - protected StubTrigger(Trigger... subTriggers) { + protected StubTrigger(Trigger... subTriggers) { super(Arrays.asList(subTriggers)); } @@ -105,18 +104,17 @@ public void clear(TriggerContext c) throws Exception { } @Override - public Instant getWatermarkThatGuaranteesFiring(IntervalWindow window) { + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } @Override - public boolean isCompatible(Trigger other) { + public boolean isCompatible(Trigger other) { return false; } @Override - public Trigger getContinuationTrigger( - List> continuationTriggers) { + public Trigger getContinuationTrigger(List continuationTriggers) { return this; } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersProperties.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersProperties.java index 3460f400fd2d..bc2a5147806c 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersProperties.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersProperties.java @@ -34,7 +34,7 @@ public class FinishedTriggersProperties { * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set * finished, it is correctly reported as finished. */ - public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) { + public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) { assertFalse(finishedSet.isFinished(trigger)); finishedSet.setFinished(trigger, true); assertTrue(finishedSet.isFinished(trigger)); @@ -45,7 +45,7 @@ public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTri * reported as finished. */ public static void verifyGetAfterSet(FinishedTriggers finishedSet) { - ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( + ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), AfterAll.of( AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); @@ -63,7 +63,7 @@ public static void verifyGetAfterSet(FinishedTriggers finishedSet) { * others. */ public static void verifyClearRecursively(FinishedTriggers finishedSet) { - ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( + ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of( AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()), AfterAll.of( AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane()))); @@ -85,25 +85,25 @@ public static void verifyClearRecursively(FinishedTriggers finishedSet) { } private static void setFinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { + FinishedTriggers finishedSet, ExecutableTrigger trigger) { finishedSet.setFinished(trigger, true); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { setFinishedRecursively(finishedSet, subTrigger); } } private static void verifyFinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { + FinishedTriggers finishedSet, ExecutableTrigger trigger) { assertTrue(finishedSet.isFinished(trigger)); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { verifyFinishedRecursively(finishedSet, subTrigger); } } private static void verifyUnfinishedRecursively( - FinishedTriggers finishedSet, ExecutableTrigger trigger) { + FinishedTriggers finishedSet, ExecutableTrigger trigger) { assertFalse(finishedSet.isFinished(trigger)); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { + for (ExecutableTrigger subTrigger : trigger.subTriggers()) { verifyUnfinishedRecursively(finishedSet, subTrigger); } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSetTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSetTest.java index ca648aad7163..a84b73ea67c3 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSetTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/FinishedTriggersSetTest.java @@ -38,7 +38,7 @@ public class FinishedTriggersSetTest { @Test public void testSetGet() { FinishedTriggersProperties.verifyGetAfterSet( - FinishedTriggersSet.fromSet(new HashSet>())); + FinishedTriggersSet.fromSet(new HashSet())); } /** @@ -48,13 +48,13 @@ public void testSetGet() { @Test public void testClearRecursively() { FinishedTriggersProperties.verifyClearRecursively( - FinishedTriggersSet.fromSet(new HashSet>())); + FinishedTriggersSet.fromSet(new HashSet())); } @Test public void testCopy() throws Exception { FinishedTriggersSet finishedSet = - FinishedTriggersSet.fromSet(new HashSet>()); + FinishedTriggersSet.fromSet(new HashSet()); assertThat(finishedSet.copy().getFinishedTriggers(), not(theInstance(finishedSet.getFinishedTriggers()))); } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index b2d246e77461..ef499f211ac1 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -85,26 +85,23 @@ @RunWith(JUnit4.class) public class ReduceFnRunnerTest { @Mock private SideInputReader mockSideInputReader; - private Trigger mockTrigger; + private Trigger mockTrigger; private PCollectionView mockView; private IntervalWindow firstWindow; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito..TriggerContext>any(); + private static Trigger.TriggerContext anyTriggerContext() { + return Mockito.any(); } - private static Trigger.OnElementContext anyElementContext() { - return Mockito..OnElementContext>any(); + private static Trigger.OnElementContext anyElementContext() { + return Mockito.any(); } @Before public void setUp() { MockitoAnnotations.initMocks(this); - @SuppressWarnings("unchecked") - Trigger mockTriggerUnchecked = - mock(Trigger.class, withSettings().serializable()); - mockTrigger = mockTriggerUnchecked; + mockTrigger = mock(Trigger.class, withSettings().serializable()); when(mockTrigger.buildTrigger()).thenReturn(mockTrigger); @SuppressWarnings("unchecked") @@ -120,13 +117,13 @@ private void injectElement(ReduceFnTester tester, in tester.injectElements(TimestampedValue.of(element, new Instant(element))); } - private void triggerShouldFinish(Trigger mockTrigger) throws Exception { + private void triggerShouldFinish(Trigger mockTrigger) throws Exception { doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Exception { @SuppressWarnings("unchecked") - Trigger.TriggerContext context = - (Trigger.TriggerContext) invocation.getArguments()[0]; + Trigger.TriggerContext context = + (Trigger.TriggerContext) invocation.getArguments()[0]; context.trigger().setFinished(true); return null; } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java index f10341b8447f..86c2d116b845 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnTester.java @@ -114,7 +114,7 @@ public class ReduceFnTester { */ private boolean autoAdvanceOutputWatermark; - private ExecutableTrigger executableTrigger; + private ExecutableTrigger executableTrigger; private final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); @@ -130,7 +130,7 @@ public class ReduceFnTester { } public static ReduceFnTester, W> - nonCombining(WindowFn windowFn, TriggerBuilder trigger, AccumulationMode mode, + nonCombining(WindowFn windowFn, TriggerBuilder trigger, AccumulationMode mode, Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception { WindowingStrategy strategy = WindowingStrategy.of(windowFn) @@ -180,7 +180,7 @@ public class ReduceFnTester { sideInputReader); } public static ReduceFnTester - combining(WindowFn windowFn, Trigger trigger, AccumulationMode mode, + combining(WindowFn windowFn, Trigger trigger, AccumulationMode mode, KeyedCombineFn combineFn, Coder outputCoder, Duration allowedDataLateness) throws Exception { @@ -228,7 +228,7 @@ ReduceFnRunner createRunner() { options); } - public ExecutableTrigger getTrigger() { + public ExecutableTrigger getTrigger() { return executableTrigger; } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java index 75708c4fbe78..61eeb901cc95 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/TriggerTester.java @@ -109,7 +109,7 @@ public SimpleTriggerTester withAllowedLateness(Duration allowedLateness) thro * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link TriggerBuilder} * under test. */ - private final ExecutableTrigger executableTrigger; + private final ExecutableTrigger executableTrigger; /** * A map from a window and trigger to whether that trigger is finished for the window. @@ -117,7 +117,7 @@ public SimpleTriggerTester withAllowedLateness(Duration allowedLateness) thro private final Map finishedSets; public static SimpleTriggerTester forTrigger( - TriggerBuilder trigger, WindowFn windowFn) + TriggerBuilder trigger, WindowFn windowFn) throws Exception { WindowingStrategy windowingStrategy = WindowingStrategy.of(windowFn).withTrigger(trigger.buildTrigger()) @@ -132,7 +132,7 @@ public static SimpleTriggerTester forTrigger( } public static TriggerTester forAdvancedTrigger( - TriggerBuilder trigger, WindowFn windowFn) throws Exception { + TriggerBuilder trigger, WindowFn windowFn) throws Exception { WindowingStrategy strategy = WindowingStrategy.of(windowFn).withTrigger(trigger.buildTrigger()) // Merging requires accumulation mode or early firings can break up a session. @@ -265,7 +265,7 @@ public final void injectElements(Collection> values) th @SuppressWarnings("unchecked") W window = activeWindows.mergeResultWindow((W) untypedWindow); - Trigger.OnElementContext context = contextFactory.createOnElementContext(window, + Trigger.OnElementContext context = contextFactory.createOnElementContext(window, new TestTimers(windowNamespace(window)), windowedValue.getTimestamp(), executableTrigger, getFinishedSet(window)); @@ -277,7 +277,7 @@ public final void injectElements(Collection> values) th } public boolean shouldFire(W window) throws Exception { - Trigger.TriggerContext context = contextFactory.base( + Trigger.TriggerContext context = contextFactory.base( window, new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window)); @@ -286,7 +286,7 @@ public boolean shouldFire(W window) throws Exception { } public void fireIfShouldFire(W window) throws Exception { - Trigger.TriggerContext context = contextFactory.base( + Trigger.TriggerContext context = contextFactory.base( window, new TestTimers(windowNamespace(window)), executableTrigger, getFinishedSet(window)); @@ -296,7 +296,7 @@ public void fireIfShouldFire(W window) throws Exception { executableTrigger.getSpec().prefetchOnFire(context.state()); executableTrigger.invokeOnFire(context); if (context.trigger().isFinished()) { - activeWindows.remove(context.window()); + activeWindows.remove(window); executableTrigger.invokeClear(context); } } @@ -337,7 +337,7 @@ public void onMerge(Collection toBeMerged, Collection activeToBeMerged, W private FinishedTriggers getFinishedSet(W window) { FinishedTriggers finishedSet = finishedSets.get(window); if (finishedSet == null) { - finishedSet = FinishedTriggersSet.fromSet(new HashSet>()); + finishedSet = FinishedTriggersSet.fromSet(new HashSet()); finishedSets.put(window, finishedSet); } return finishedSet;