Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private AfterAll(List<Trigger> subTriggers) {
* Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
public static <W extends BoundedWindow> OnceTrigger of(OnceTrigger... triggers) {
public static OnceTrigger of(OnceTrigger... triggers) {
return new AfterAll(Arrays.<Trigger>asList(triggers));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private AfterEach(List<Trigger> subTriggers) {
* Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
public static <W extends BoundedWindow> Trigger inOrder(Trigger... triggers) {
public static Trigger inOrder(Trigger... triggers) {
return new AfterEach(Arrays.<Trigger>asList(triggers));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class AfterFirst extends OnceTrigger {
* Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
public static <W extends BoundedWindow> OnceTrigger of(
public static OnceTrigger of(
OnceTrigger... triggers) {
return new AfterFirst(Arrays.<Trigger>asList(triggers));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transfo
* Creates a trigger that fires when the current processing time passes the processing time
* at which this trigger saw the first element in a pane.
*/
public static <W extends BoundedWindow> AfterProcessingTime pastFirstElementInPane() {
public static AfterProcessingTime pastFirstElementInPane() {
return new AfterProcessingTime(IDENTITY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private DefaultTrigger() {
/**
* Returns the default trigger.
*/
public static <W extends BoundedWindow> DefaultTrigger of() {
public static DefaultTrigger of() {
return new DefaultTrigger();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class Repeatedly extends Trigger {
*
* @param repeated the trigger to execute repeatedly.
*/
public static <W extends BoundedWindow> Repeatedly forever(Trigger repeated) {
public static Repeatedly forever(Trigger repeated) {
return new Repeatedly(repeated);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class AfterAllTest {
public void testT1FiresFirst() throws Exception {
tester = TriggerTester.forTrigger(
AfterAll.of(
AfterPane.<IntervalWindow>elementCountAtLeast(1),
AfterPane.<IntervalWindow>elementCountAtLeast(2)),
AfterPane.elementCountAtLeast(1),
AfterPane.elementCountAtLeast(2)),
FixedWindows.of(Duration.millis(100)));

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
Expand All @@ -62,8 +62,8 @@ public void testT1FiresFirst() throws Exception {
public void testT2FiresFirst() throws Exception {
tester = TriggerTester.forTrigger(
AfterAll.of(
AfterPane.<IntervalWindow>elementCountAtLeast(2),
AfterPane.<IntervalWindow>elementCountAtLeast(1)),
AfterPane.elementCountAtLeast(2),
AfterPane.elementCountAtLeast(1)),
FixedWindows.of(Duration.millis(100)));

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
Expand All @@ -86,9 +86,9 @@ public void testOnMergeRewinds() throws Exception {
tester = TriggerTester.forTrigger(
AfterEach.inOrder(
AfterAll.of(
AfterWatermark.<IntervalWindow>pastEndOfWindow(),
AfterPane.<IntervalWindow>elementCountAtLeast(1)),
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(1))),
AfterWatermark.pastEndOfWindow(),
AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
Sessions.withGapDuration(Duration.millis(10)));

tester.injectElements(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public void initMocks() {
public void testAfterEachInSequence() throws Exception {
tester = TriggerTester.forTrigger(
AfterEach.inOrder(
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(2))
.orFinally(AfterPane.<IntervalWindow>elementCountAtLeast(3)),
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(5))
.orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow())),
Repeatedly.forever(AfterPane.elementCountAtLeast(2))
.orFinally(AfterPane.elementCountAtLeast(3)),
Repeatedly.forever(AfterPane.elementCountAtLeast(5))
.orFinally(AfterWatermark.pastEndOfWindow())),
FixedWindows.of(Duration.millis(10)));

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public void testBothShouldFireFixedWindows() throws Exception {
public void testShouldFireAfterMerge() throws Exception {
tester = TriggerTester.forTrigger(
AfterEach.inOrder(
AfterFirst.of(AfterPane.<IntervalWindow>elementCountAtLeast(5),
AfterWatermark.<IntervalWindow>pastEndOfWindow()),
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(1))),
AfterFirst.of(AfterPane.elementCountAtLeast(5),
AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
Sessions.withGapDuration(Duration.millis(10)));

// Finished the AfterFirst in the first window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AfterPaneTest {
@Test
public void testAfterPaneElementCountFixedWindows() throws Exception {
tester = TriggerTester.forTrigger(
AfterPane.<IntervalWindow>elementCountAtLeast(2),
AfterPane.elementCountAtLeast(2),
FixedWindows.of(Duration.millis(10)));

tester.injectElements(1); // [0, 10)
Expand All @@ -65,7 +65,7 @@ public void testAfterPaneElementCountFixedWindows() throws Exception {
@Test
public void testClear() throws Exception {
SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
AfterPane.<IntervalWindow>elementCountAtLeast(2),
AfterPane.elementCountAtLeast(2),
FixedWindows.of(Duration.millis(10)));

tester.injectElements(1, 2, 3);
Expand All @@ -77,7 +77,7 @@ public void testClear() throws Exception {
@Test
public void testAfterPaneElementCountSessions() throws Exception {
tester = TriggerTester.forTrigger(
AfterPane.<IntervalWindow>elementCountAtLeast(2),
AfterPane.elementCountAtLeast(2),
Sessions.withGapDuration(Duration.millis(10)));

tester.injectElements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testAfterProcessingTimeFixedWindows() throws Exception {
Duration windowDuration = Duration.millis(10);
SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
AfterProcessingTime
.<IntervalWindow>pastFirstElementInPane()
.pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)),
FixedWindows.of(windowDuration));

Expand Down Expand Up @@ -93,7 +93,7 @@ public void testAfterProcessingTimeFixedWindows() throws Exception {
public void testClear() throws Exception {
SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
AfterProcessingTime
.<IntervalWindow>pastFirstElementInPane()
.pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)),
FixedWindows.of(Duration.millis(10)));

Expand All @@ -107,7 +107,7 @@ public void testClear() throws Exception {
public void testAfterProcessingTimeWithMergingWindow() throws Exception {
SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
AfterProcessingTime
.<IntervalWindow>pastFirstElementInPane()
.pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)),
Sessions.withGapDuration(Duration.millis(10)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testAfterProcessingTimeWithFixedWindows() throws Exception {
Duration windowDuration = Duration.millis(10);
SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
AfterProcessingTime
.<IntervalWindow>pastFirstElementInPane()
.pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)),
FixedWindows.of(windowDuration));

Expand Down Expand Up @@ -87,7 +87,7 @@ public void testAfterProcessingTimeWithMergingWindow() throws Exception {
Duration windowDuration = Duration.millis(10);
SimpleTriggerTester<IntervalWindow> tester = TriggerTester.forTrigger(
AfterProcessingTime
.<IntervalWindow>pastFirstElementInPane()
.pastFirstElementInPane()
.plusDelayOf(Duration.millis(5)),
Sessions.withGapDuration(windowDuration));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window)
@Test
public void testEarlyAndAtWatermark() throws Exception {
tester = TriggerTester.forTrigger(
AfterWatermark.<IntervalWindow>pastEndOfWindow()
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(mockEarly),
FixedWindows.of(Duration.millis(100)));

Expand All @@ -104,7 +104,7 @@ public void testEarlyAndAtWatermark() throws Exception {
@Test
public void testAtWatermarkAndLate() throws Exception {
tester = TriggerTester.forTrigger(
AfterWatermark.<IntervalWindow>pastEndOfWindow()
AfterWatermark.pastEndOfWindow()
.withLateFirings(mockLate),
FixedWindows.of(Duration.millis(100)));

Expand All @@ -130,7 +130,7 @@ public void testAtWatermarkAndLate() throws Exception {
@Test
public void testEarlyAndAtWatermarkAndLate() throws Exception {
tester = TriggerTester.forTrigger(
AfterWatermark.<IntervalWindow>pastEndOfWindow()
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(mockEarly)
.withLateFirings(mockLate),
FixedWindows.of(Duration.millis(100)));
Expand Down Expand Up @@ -162,8 +162,8 @@ public void testEarlyAndAtWatermarkAndLate() throws Exception {
public void testOnMergeAlreadyFinished() throws Exception {
tester = TriggerTester.forTrigger(
AfterEach.inOrder(
AfterWatermark.<IntervalWindow>pastEndOfWindow(),
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(1))),
AfterWatermark.pastEndOfWindow(),
Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
Sessions.withGapDuration(Duration.millis(10)));

tester.injectElements(1);
Expand Down Expand Up @@ -209,8 +209,8 @@ public void testOnMergeAlreadyFinished() throws Exception {
public void testOnMergeRewinds() throws Exception {
tester = TriggerTester.forTrigger(
AfterEach.inOrder(
AfterWatermark.<IntervalWindow>pastEndOfWindow(),
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(1))),
AfterWatermark.pastEndOfWindow(),
Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
Sessions.withGapDuration(Duration.millis(10)));

tester.injectElements(1);
Expand Down Expand Up @@ -255,9 +255,9 @@ public void testOnMergeRewinds() throws Exception {
@Test
public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
tester = TriggerTester.forTrigger(
AfterWatermark.<IntervalWindow>pastEndOfWindow()
.withEarlyFirings(AfterPane.<IntervalWindow>elementCountAtLeast(100))
.withLateFirings(AfterPane.<IntervalWindow>elementCountAtLeast(1)),
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(100))
.withLateFirings(AfterPane.elementCountAtLeast(1)),
Sessions.withGapDuration(Duration.millis(10)));

tester.injectElements(1);
Expand Down Expand Up @@ -302,9 +302,9 @@ public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception {
@Test
public void testEarlyAndLateOnMergeRewinds() throws Exception {
tester = TriggerTester.forTrigger(
AfterWatermark.<IntervalWindow>pastEndOfWindow()
.withEarlyFirings(AfterPane.<IntervalWindow>elementCountAtLeast(100))
.withLateFirings(AfterPane.<IntervalWindow>elementCountAtLeast(1)),
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(100))
.withLateFirings(AfterPane.elementCountAtLeast(1)),
Sessions.withGapDuration(Duration.millis(10)));

tester.injectElements(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class DefaultTriggerTest {
@Test
public void testDefaultTriggerFixedWindows() throws Exception {
tester = TriggerTester.forTrigger(
DefaultTrigger.<IntervalWindow>of(),
DefaultTrigger.of(),
FixedWindows.of(Duration.millis(100)));

tester.injectElements(
Expand Down Expand Up @@ -79,7 +79,7 @@ public void testDefaultTriggerFixedWindows() throws Exception {
@Test
public void testDefaultTriggerSlidingWindows() throws Exception {
tester = TriggerTester.forTrigger(
DefaultTrigger.<IntervalWindow>of(),
DefaultTrigger.of(),
SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50)));

tester.injectElements(
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testDefaultTriggerSlidingWindows() throws Exception {
@Test
public void testDefaultTriggerSessions() throws Exception {
tester = TriggerTester.forTrigger(
DefaultTrigger.<IntervalWindow>of(),
DefaultTrigger.of(),
Sessions.withGapDuration(Duration.millis(100)));

tester.injectElements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class NeverTest {
public void setup() throws Exception {
triggerTester =
TriggerTester.forTrigger(
Never.<IntervalWindow>ever(), FixedWindows.of(Duration.standardMinutes(5)));
Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class OrFinallyTriggerTest {
public void testActualFiresAndFinishes() throws Exception {
tester = TriggerTester.forTrigger(
new OrFinallyTrigger(
AfterPane.<IntervalWindow>elementCountAtLeast(2),
AfterPane.<IntervalWindow>elementCountAtLeast(100)),
AfterPane.elementCountAtLeast(2),
AfterPane.elementCountAtLeast(100)),
FixedWindows.of(Duration.millis(100)));

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
Expand All @@ -74,8 +74,8 @@ public void testActualFiresAndFinishes() throws Exception {
public void testActualFiresOnly() throws Exception {
tester = TriggerTester.forTrigger(
new OrFinallyTrigger(
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(2)),
AfterPane.<IntervalWindow>elementCountAtLeast(100)),
Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
AfterPane.elementCountAtLeast(100)),
FixedWindows.of(Duration.millis(100)));

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
Expand Down Expand Up @@ -106,9 +106,9 @@ public void testActualFiresOnly() throws Exception {
public void testShouldFireAfterMerge() throws Exception {
tester = TriggerTester.forTrigger(
AfterEach.inOrder(
AfterPane.<IntervalWindow>elementCountAtLeast(5)
.orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow()),
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(1))),
AfterPane.elementCountAtLeast(5)
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
Sessions.withGapDuration(Duration.millis(10)));

// Finished the orFinally in the first window
Expand Down Expand Up @@ -144,8 +144,8 @@ public void testShouldFireAfterMerge() throws Exception {
public void testActualFiresButUntilFinishes() throws Exception {
tester = TriggerTester.forTrigger(
new OrFinallyTrigger(
Repeatedly.forever(AfterPane.<IntervalWindow>elementCountAtLeast(2)),
AfterPane.<IntervalWindow>elementCountAtLeast(3)),
Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
AfterPane.elementCountAtLeast(3)),
FixedWindows.of(Duration.millis(10)));

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));
Expand Down
Loading