Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@

/**
* Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired.
*
* @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
* {@code Trigger}
*/
@Experimental(Experimental.Kind.TRIGGER)
public class AfterAll<W extends BoundedWindow> extends OnceTrigger<W> {
public class AfterAll extends OnceTrigger {

private AfterAll(List<Trigger<W>> subTriggers) {
private AfterAll(List<Trigger> subTriggers) {
super(subTriggers);
Preconditions.checkArgument(subTriggers.size() > 1);
}
Expand All @@ -45,14 +42,13 @@ private AfterAll(List<Trigger<W>> subTriggers) {
* Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
public static <W extends BoundedWindow> OnceTrigger<W> of(
OnceTrigger<W>... triggers) {
return new AfterAll<W>(Arrays.<Trigger<W>>asList(triggers));
public static <W extends BoundedWindow> OnceTrigger of(OnceTrigger... triggers) {
return new AfterAll(Arrays.<Trigger>asList(triggers));
}

@Override
public void onElement(OnElementContext c) throws Exception {
for (ExecutableTrigger<W> subTrigger : c.trigger().unfinishedSubTriggers()) {
for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) {
// Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
// invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
subTrigger.invokeOnElement(c);
Expand All @@ -61,21 +57,21 @@ public void onElement(OnElementContext c) throws Exception {

@Override
public void onMerge(OnMergeContext c) throws Exception {
for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
subTrigger.invokeOnMerge(c);
}
boolean allFinished = true;
for (ExecutableTrigger<W> subTrigger1 : c.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) {
allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
}
c.trigger().setFinished(allFinished);
}

@Override
public Instant getWatermarkThatGuaranteesFiring(W window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger will fire after the latest of its sub-triggers.
Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
for (Trigger<W> subTrigger : subTriggers) {
for (Trigger subTrigger : subTriggers) {
Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
if (deadline.isBefore(subDeadline)) {
deadline = subDeadline;
Expand All @@ -85,8 +81,8 @@ public Instant getWatermarkThatGuaranteesFiring(W window) {
}

@Override
public OnceTrigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
return new AfterAll<W>(continuationTriggers);
public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return new AfterAll(continuationTriggers);
}

/**
Expand All @@ -96,7 +92,7 @@ public OnceTrigger<W> getContinuationTrigger(List<Trigger<W>> continuationTrigge
*/
@Override
public boolean shouldFire(TriggerContext context) throws Exception {
for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
if (!context.forTrigger(subtrigger).trigger().isFinished()
&& !subtrigger.invokeShouldFire(context)) {
return false;
Expand All @@ -111,7 +107,7 @@ public boolean shouldFire(TriggerContext context) throws Exception {
*/
@Override
public void onOnlyFiring(TriggerContext context) throws Exception {
for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
subtrigger.invokeOnFire(context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* <p>This class is for internal use only and may change at any time.
*/
@Experimental(Experimental.Kind.TRIGGER)
public abstract class AfterDelayFromFirstElement<W extends BoundedWindow> extends OnceTrigger<W> {
public abstract class AfterDelayFromFirstElement extends OnceTrigger {

protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
ImmutableList.<SerializableFunction<Instant, Instant>>of();
Expand All @@ -62,14 +62,14 @@ public abstract class AfterDelayFromFirstElement<W extends BoundedWindow> extend
* To complete an implementation, return the desired time from the TriggerContext.
*/
@Nullable
public abstract Instant getCurrentTime(Trigger<W>.TriggerContext context);
public abstract Instant getCurrentTime(Trigger.TriggerContext context);

/**
* To complete an implementation, return a new instance like this one, but incorporating
* the provided timestamp mapping functions. Generally should be used by calling the
* constructor of this class from the constructor of the subclass.
*/
protected abstract AfterDelayFromFirstElement<W> newWith(
protected abstract AfterDelayFromFirstElement newWith(
List<SerializableFunction<Instant, Instant>> transform);

/**
Expand Down Expand Up @@ -100,15 +100,15 @@ private Instant getTargetTimestamp(OnElementContext c) {
* <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of
* CalendarWindows.
*/
public AfterDelayFromFirstElement<W> alignedTo(final Duration size, final Instant offset) {
public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) {
return newWith(new AlignFn(size, offset));
}

/**
* Aligns the time to be the smallest multiple of {@code size} greater than the timestamp
* since the epoch.
*/
public AfterDelayFromFirstElement<W> alignedTo(final Duration size) {
public AfterDelayFromFirstElement alignedTo(final Duration size) {
return alignedTo(size, new Instant(0));
}

Expand All @@ -118,7 +118,7 @@ public AfterDelayFromFirstElement<W> alignedTo(final Duration size) {
* @param delay the delay to add
* @return An updated time trigger that will wait the additional time before firing.
*/
public AfterDelayFromFirstElement<W> plusDelayOf(final Duration delay) {
public AfterDelayFromFirstElement plusDelayOf(final Duration delay) {
return newWith(new DelayFn(delay));
}

Expand All @@ -127,22 +127,22 @@ public AfterDelayFromFirstElement<W> plusDelayOf(final Duration delay) {
* {@link #plusDelayOf} and {@link #alignedTo}.
*/
@Deprecated
public OnceTrigger<W> mappedTo(SerializableFunction<Instant, Instant> timestampMapper) {
public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> timestampMapper) {
return newWith(timestampMapper);
}

@Override
public boolean isCompatible(Trigger<?> other) {
public boolean isCompatible(Trigger other) {
if (!getClass().equals(other.getClass())) {
return false;
}

AfterDelayFromFirstElement<?> that = (AfterDelayFromFirstElement<?>) other;
AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other;
return this.timestampMappers.equals(that.timestampMappers);
}


private AfterDelayFromFirstElement<W> newWith(
private AfterDelayFromFirstElement newWith(
SerializableFunction<Instant, Instant> timestampMapper) {
return newWith(
ImmutableList.<SerializableFunction<Instant, Instant>>builder()
Expand Down Expand Up @@ -173,7 +173,7 @@ public void onElement(OnElementContext c) throws Exception {
}

@Override
public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
public void prefetchOnMerge(MergingStateAccessor<?, ?> state) {
super.prefetchOnMerge(state);
StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
}
Expand Down Expand Up @@ -218,20 +218,20 @@ public void clear(TriggerContext c) throws Exception {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(W window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}

@Override
public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
return delayedUntil != null
&& getCurrentTime(context) != null
&& getCurrentTime(context).isAfter(delayedUntil);
}

@Override
protected void onOnlyFiring(Trigger<W>.TriggerContext context) throws Exception {
protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
clear(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,11 @@
* <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as
* {@code Repeatedly.forever(a)}, since the repeated trigger never finishes.
* </ul>
*
* @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
* {@code Trigger}
*/
@Experimental(Experimental.Kind.TRIGGER)
public class AfterEach<W extends BoundedWindow> extends Trigger<W> {
public class AfterEach extends Trigger {

private AfterEach(List<Trigger<W>> subTriggers) {
private AfterEach(List<Trigger> subTriggers) {
super(subTriggers);
checkArgument(subTriggers.size() > 1);
}
Expand All @@ -58,8 +55,8 @@ private AfterEach(List<Trigger<W>> subTriggers) {
* Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
public static <W extends BoundedWindow> Trigger<W> inOrder(Trigger<W>... triggers) {
return new AfterEach<W>(Arrays.<Trigger<W>>asList(triggers));
public static <W extends BoundedWindow> Trigger inOrder(Trigger... triggers) {
return new AfterEach(Arrays.<Trigger>asList(triggers));
}

@Override
Expand All @@ -69,7 +66,7 @@ public void onElement(OnElementContext c) throws Exception {
c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
} else {
// If merges are possible, we need to run all subtriggers in parallel
for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
// Even if the subTrigger is done, it may be revived via merging and must have
// adequate state.
subTrigger.invokeOnElement(c);
Expand All @@ -86,7 +83,7 @@ public void onMerge(OnMergeContext context) throws Exception {
// also automatic because they are cleared whenever this trigger
// fires.
boolean priorTriggersAllFinished = true;
for (ExecutableTrigger<W> subTrigger : context.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
if (priorTriggersAllFinished) {
subTrigger.invokeOnMerge(context);
priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
Expand All @@ -98,31 +95,31 @@ public void onMerge(OnMergeContext context) throws Exception {
}

@Override
public Instant getWatermarkThatGuaranteesFiring(W window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger will fire at least once when the first trigger in the sequence
// fires at least once.
return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
}

@Override
public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
return Repeatedly.forever(new AfterFirst<W>(continuationTriggers));
public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return Repeatedly.forever(new AfterFirst(continuationTriggers));
}

@Override
public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
ExecutableTrigger<W> firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
return firstUnfinished.invokeShouldFire(context);
}

@Override
public void onFire(Trigger<W>.TriggerContext context) throws Exception {
public void onFire(Trigger.TriggerContext context) throws Exception {
context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);

// Reset all subtriggers if in a merging context; any may be revived by merging so they are
// all run in parallel for each pending pane.
if (context.trigger().isMerging()) {
for (ExecutableTrigger<W> subTrigger : context.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) {
subTrigger.invokeClear(context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@
/**
* Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have
* fired.
*
* @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
* {@code Trigger}
*/
@Experimental(Experimental.Kind.TRIGGER)
public class AfterFirst<W extends BoundedWindow> extends OnceTrigger<W> {
public class AfterFirst extends OnceTrigger {

AfterFirst(List<Trigger<W>> subTriggers) {
AfterFirst(List<Trigger> subTriggers) {
super(subTriggers);
Preconditions.checkArgument(subTriggers.size() > 1);
}
Expand All @@ -46,31 +43,31 @@ public class AfterFirst<W extends BoundedWindow> extends OnceTrigger<W> {
* Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
public static <W extends BoundedWindow> OnceTrigger<W> of(
OnceTrigger<W>... triggers) {
return new AfterFirst<W>(Arrays.<Trigger<W>>asList(triggers));
public static <W extends BoundedWindow> OnceTrigger of(
OnceTrigger... triggers) {
return new AfterFirst(Arrays.<Trigger>asList(triggers));
}

@Override
public void onElement(OnElementContext c) throws Exception {
for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
subTrigger.invokeOnElement(c);
}
}

@Override
public void onMerge(OnMergeContext c) throws Exception {
for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
subTrigger.invokeOnMerge(c);
}
updateFinishedStatus(c);
}

@Override
public Instant getWatermarkThatGuaranteesFiring(W window) {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
// This trigger will fire after the earliest of its sub-triggers.
Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (Trigger<W> subTrigger : subTriggers) {
for (Trigger subTrigger : subTriggers) {
Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
if (deadline.isAfter(subDeadline)) {
deadline = subDeadline;
Expand All @@ -80,13 +77,13 @@ public Instant getWatermarkThatGuaranteesFiring(W window) {
}

@Override
public OnceTrigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
return new AfterFirst<W>(continuationTriggers);
public OnceTrigger getContinuationTrigger(List<Trigger> continuationTriggers) {
return new AfterFirst(continuationTriggers);
}

@Override
public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
if (context.forTrigger(subtrigger).trigger().isFinished()
|| subtrigger.invokeShouldFire(context)) {
return true;
Expand All @@ -97,7 +94,7 @@ public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {

@Override
protected void onOnlyFiring(TriggerContext context) throws Exception {
for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) {
TriggerContext subContext = context.forTrigger(subtrigger);
if (subtrigger.invokeShouldFire(subContext)) {
// If the trigger is ready to fire, then do whatever it needs to do.
Expand All @@ -112,7 +109,7 @@ protected void onOnlyFiring(TriggerContext context) throws Exception {

private void updateFinishedStatus(TriggerContext c) {
boolean anyFinished = false;
for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) {
anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
}
c.trigger().setFinished(anyFinished);
Expand Down
Loading