diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index 0f609dfc0337..e6d8e84a6747 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -44,7 +44,7 @@ private AfterAll(List subTriggers) { * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static OnceTrigger of(OnceTrigger... triggers) { + public static OnceTrigger of(OnceTrigger... triggers) { return new AfterAll(Arrays.asList(triggers)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 59cb73ce150c..bd57339dd86c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -56,7 +56,7 @@ private AfterEach(List subTriggers) { * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static Trigger inOrder(Trigger... triggers) { + public static Trigger inOrder(Trigger... triggers) { return new AfterEach(Arrays.asList(triggers)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index a8508a3fc8ce..a43d9ace74c4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -45,7 +45,7 @@ public class AfterFirst extends OnceTrigger { * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static OnceTrigger of( + public static OnceTrigger of( OnceTrigger... triggers) { return new AfterFirst(Arrays.asList(triggers)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index 05c681562ec4..563455bcbf0d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -51,7 +51,7 @@ private AfterProcessingTime(List> transfo * Creates a trigger that fires when the current processing time passes the processing time * at which this trigger saw the first element in a pane. */ - public static AfterProcessingTime pastFirstElementInPane() { + public static AfterProcessingTime pastFirstElementInPane() { return new AfterProcessingTime(IDENTITY); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java index 3c6dbf3f6af5..fcea3337b8a4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -38,7 +38,7 @@ private DefaultTrigger() { /** * Returns the default trigger. */ - public static DefaultTrigger of() { + public static DefaultTrigger of() { return new DefaultTrigger(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index ec79cf93dabf..591bbf06a0ea 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -47,7 +47,7 @@ public class Repeatedly extends Trigger { * * @param repeated the trigger to execute repeatedly. */ - public static Repeatedly forever(Trigger repeated) { + public static Repeatedly forever(Trigger repeated) { return new Repeatedly(repeated); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java index 969c1fe722cf..b7980132cb5f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java @@ -43,8 +43,8 @@ public class AfterAllTest { public void testT1FiresFirst() throws Exception { tester = TriggerTester.forTrigger( AfterAll.of( - AfterPane.elementCountAtLeast(1), - AfterPane.elementCountAtLeast(2)), + AfterPane.elementCountAtLeast(1), + AfterPane.elementCountAtLeast(2)), FixedWindows.of(Duration.millis(100))); IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); @@ -62,8 +62,8 @@ public void testT1FiresFirst() throws Exception { public void testT2FiresFirst() throws Exception { tester = TriggerTester.forTrigger( AfterAll.of( - AfterPane.elementCountAtLeast(2), - AfterPane.elementCountAtLeast(1)), + AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(1)), FixedWindows.of(Duration.millis(100))); IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); @@ -86,9 +86,9 @@ public void testOnMergeRewinds() throws Exception { tester = TriggerTester.forTrigger( AfterEach.inOrder( AfterAll.of( - AfterWatermark.pastEndOfWindow(), - AfterPane.elementCountAtLeast(1)), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + AfterWatermark.pastEndOfWindow(), + AfterPane.elementCountAtLeast(1)), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), Sessions.withGapDuration(Duration.millis(10))); tester.injectElements(1); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java index f5d83a708b3f..df557eb30132 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java @@ -53,10 +53,10 @@ public void initMocks() { public void testAfterEachInSequence() throws Exception { tester = TriggerTester.forTrigger( AfterEach.inOrder( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)) - .orFinally(AfterPane.elementCountAtLeast(3)), - Repeatedly.forever(AfterPane.elementCountAtLeast(5)) - .orFinally(AfterWatermark.pastEndOfWindow())), + Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + .orFinally(AfterPane.elementCountAtLeast(3)), + Repeatedly.forever(AfterPane.elementCountAtLeast(5)) + .orFinally(AfterWatermark.pastEndOfWindow())), FixedWindows.of(Duration.millis(10))); IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java index c0a9f2be3d02..a16669425dd1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java @@ -123,9 +123,9 @@ public void testBothShouldFireFixedWindows() throws Exception { public void testShouldFireAfterMerge() throws Exception { tester = TriggerTester.forTrigger( AfterEach.inOrder( - AfterFirst.of(AfterPane.elementCountAtLeast(5), - AfterWatermark.pastEndOfWindow()), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + AfterFirst.of(AfterPane.elementCountAtLeast(5), + AfterWatermark.pastEndOfWindow()), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), Sessions.withGapDuration(Duration.millis(10))); // Finished the AfterFirst in the first window diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java index 827d4c63907f..76ee49c0b402 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java @@ -44,7 +44,7 @@ public class AfterPaneTest { @Test public void testAfterPaneElementCountFixedWindows() throws Exception { tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(2), FixedWindows.of(Duration.millis(10))); tester.injectElements(1); // [0, 10) @@ -65,7 +65,7 @@ public void testAfterPaneElementCountFixedWindows() throws Exception { @Test public void testClear() throws Exception { SimpleTriggerTester tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(2), FixedWindows.of(Duration.millis(10))); tester.injectElements(1, 2, 3); @@ -77,7 +77,7 @@ public void testClear() throws Exception { @Test public void testAfterPaneElementCountSessions() throws Exception { tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(2), Sessions.withGapDuration(Duration.millis(10))); tester.injectElements( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java index 81aad3389ff5..8178d549e1fa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -46,7 +46,7 @@ public void testAfterProcessingTimeFixedWindows() throws Exception { Duration windowDuration = Duration.millis(10); SimpleTriggerTester tester = TriggerTester.forTrigger( AfterProcessingTime - .pastFirstElementInPane() + .pastFirstElementInPane() .plusDelayOf(Duration.millis(5)), FixedWindows.of(windowDuration)); @@ -93,7 +93,7 @@ public void testAfterProcessingTimeFixedWindows() throws Exception { public void testClear() throws Exception { SimpleTriggerTester tester = TriggerTester.forTrigger( AfterProcessingTime - .pastFirstElementInPane() + .pastFirstElementInPane() .plusDelayOf(Duration.millis(5)), FixedWindows.of(Duration.millis(10))); @@ -107,7 +107,7 @@ public void testClear() throws Exception { public void testAfterProcessingTimeWithMergingWindow() throws Exception { SimpleTriggerTester tester = TriggerTester.forTrigger( AfterProcessingTime - .pastFirstElementInPane() + .pastFirstElementInPane() .plusDelayOf(Duration.millis(5)), Sessions.withGapDuration(Duration.millis(10))); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java index a44be90b1fc2..4c089db19c64 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java @@ -43,7 +43,7 @@ public void testAfterProcessingTimeWithFixedWindows() throws Exception { Duration windowDuration = Duration.millis(10); SimpleTriggerTester tester = TriggerTester.forTrigger( AfterProcessingTime - .pastFirstElementInPane() + .pastFirstElementInPane() .plusDelayOf(Duration.millis(5)), FixedWindows.of(windowDuration)); @@ -87,7 +87,7 @@ public void testAfterProcessingTimeWithMergingWindow() throws Exception { Duration windowDuration = Duration.millis(10); SimpleTriggerTester tester = TriggerTester.forTrigger( AfterProcessingTime - .pastFirstElementInPane() + .pastFirstElementInPane() .plusDelayOf(Duration.millis(5)), Sessions.withGapDuration(windowDuration)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java index ef8471407521..be0ec1cb7d78 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java @@ -84,7 +84,7 @@ public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window) @Test public void testEarlyAndAtWatermark() throws Exception { tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() + AfterWatermark.pastEndOfWindow() .withEarlyFirings(mockEarly), FixedWindows.of(Duration.millis(100))); @@ -104,7 +104,7 @@ public void testEarlyAndAtWatermark() throws Exception { @Test public void testAtWatermarkAndLate() throws Exception { tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() + AfterWatermark.pastEndOfWindow() .withLateFirings(mockLate), FixedWindows.of(Duration.millis(100))); @@ -130,7 +130,7 @@ public void testAtWatermarkAndLate() throws Exception { @Test public void testEarlyAndAtWatermarkAndLate() throws Exception { tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() + AfterWatermark.pastEndOfWindow() .withEarlyFirings(mockEarly) .withLateFirings(mockLate), FixedWindows.of(Duration.millis(100))); @@ -162,8 +162,8 @@ public void testEarlyAndAtWatermarkAndLate() throws Exception { public void testOnMergeAlreadyFinished() throws Exception { tester = TriggerTester.forTrigger( AfterEach.inOrder( - AfterWatermark.pastEndOfWindow(), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + AfterWatermark.pastEndOfWindow(), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), Sessions.withGapDuration(Duration.millis(10))); tester.injectElements(1); @@ -209,8 +209,8 @@ public void testOnMergeAlreadyFinished() throws Exception { public void testOnMergeRewinds() throws Exception { tester = TriggerTester.forTrigger( AfterEach.inOrder( - AfterWatermark.pastEndOfWindow(), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + AfterWatermark.pastEndOfWindow(), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), Sessions.withGapDuration(Duration.millis(10))); tester.injectElements(1); @@ -255,9 +255,9 @@ public void testOnMergeRewinds() throws Exception { @Test public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception { tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(100)) - .withLateFirings(AfterPane.elementCountAtLeast(1)), + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(100)) + .withLateFirings(AfterPane.elementCountAtLeast(1)), Sessions.withGapDuration(Duration.millis(10))); tester.injectElements(1); @@ -302,9 +302,9 @@ public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception { @Test public void testEarlyAndLateOnMergeRewinds() throws Exception { tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(100)) - .withLateFirings(AfterPane.elementCountAtLeast(1)), + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(100)) + .withLateFirings(AfterPane.elementCountAtLeast(1)), Sessions.withGapDuration(Duration.millis(10))); tester.injectElements(1); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java index b31ad56bc015..6ed1c8190df0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java @@ -42,7 +42,7 @@ public class DefaultTriggerTest { @Test public void testDefaultTriggerFixedWindows() throws Exception { tester = TriggerTester.forTrigger( - DefaultTrigger.of(), + DefaultTrigger.of(), FixedWindows.of(Duration.millis(100))); tester.injectElements( @@ -79,7 +79,7 @@ public void testDefaultTriggerFixedWindows() throws Exception { @Test public void testDefaultTriggerSlidingWindows() throws Exception { tester = TriggerTester.forTrigger( - DefaultTrigger.of(), + DefaultTrigger.of(), SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50))); tester.injectElements( @@ -125,7 +125,7 @@ public void testDefaultTriggerSlidingWindows() throws Exception { @Test public void testDefaultTriggerSessions() throws Exception { tester = TriggerTester.forTrigger( - DefaultTrigger.of(), + DefaultTrigger.of(), Sessions.withGapDuration(Duration.millis(100))); tester.injectElements( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java index 222fe4ef8406..fb2b4d5c730d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java @@ -41,7 +41,7 @@ public class NeverTest { public void setup() throws Exception { triggerTester = TriggerTester.forTrigger( - Never.ever(), FixedWindows.of(Duration.standardMinutes(5))); + Never.ever(), FixedWindows.of(Duration.standardMinutes(5))); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java index ea178a8b3128..93971793cfe2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java @@ -47,8 +47,8 @@ public class OrFinallyTriggerTest { public void testActualFiresAndFinishes() throws Exception { tester = TriggerTester.forTrigger( new OrFinallyTrigger( - AfterPane.elementCountAtLeast(2), - AfterPane.elementCountAtLeast(100)), + AfterPane.elementCountAtLeast(2), + AfterPane.elementCountAtLeast(100)), FixedWindows.of(Duration.millis(100))); IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); @@ -74,8 +74,8 @@ public void testActualFiresAndFinishes() throws Exception { public void testActualFiresOnly() throws Exception { tester = TriggerTester.forTrigger( new OrFinallyTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - AfterPane.elementCountAtLeast(100)), + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + AfterPane.elementCountAtLeast(100)), FixedWindows.of(Duration.millis(100))); IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); @@ -106,9 +106,9 @@ public void testActualFiresOnly() throws Exception { public void testShouldFireAfterMerge() throws Exception { tester = TriggerTester.forTrigger( AfterEach.inOrder( - AfterPane.elementCountAtLeast(5) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), + AfterPane.elementCountAtLeast(5) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever(AfterPane.elementCountAtLeast(1))), Sessions.withGapDuration(Duration.millis(10))); // Finished the orFinally in the first window @@ -144,8 +144,8 @@ public void testShouldFireAfterMerge() throws Exception { public void testActualFiresButUntilFinishes() throws Exception { tester = TriggerTester.forTrigger( new OrFinallyTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - AfterPane.elementCountAtLeast(3)), + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + AfterPane.elementCountAtLeast(3)), FixedWindows.of(Duration.millis(10))); IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java index ddb9f9a39b77..3a33182f0f99 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java @@ -110,7 +110,7 @@ public void testContinuation() throws Exception { @Test public void testShouldFireAfterMerge() throws Exception { tester = TriggerTester.forTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), + Repeatedly.forever(AfterPane.elementCountAtLeast(2)), Sessions.withGapDuration(Duration.millis(10))); tester.injectElements(1); @@ -132,10 +132,10 @@ public void testRepeatedlyAfterFirstElementCount() throws Exception { SimpleTriggerTester tester = TriggerTester.forTrigger( Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(15)), - AfterPane.elementCountAtLeast(5))), + AfterPane.elementCountAtLeast(5))), new GlobalWindows()); GlobalWindow window = GlobalWindow.INSTANCE; @@ -154,10 +154,10 @@ public void testRepeatedlyAfterFirstProcessingTime() throws Exception { SimpleTriggerTester tester = TriggerTester.forTrigger( Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(15)), - AfterPane.elementCountAtLeast(5))), + AfterPane.elementCountAtLeast(5))), new GlobalWindows()); GlobalWindow window = GlobalWindow.INSTANCE; @@ -175,7 +175,7 @@ public void testRepeatedlyAfterFirstProcessingTime() throws Exception { public void testRepeatedlyElementCount() throws Exception { SimpleTriggerTester tester = TriggerTester.forTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(5)), + Repeatedly.forever(AfterPane.elementCountAtLeast(5)), new GlobalWindows()); GlobalWindow window = GlobalWindow.INSTANCE; @@ -194,7 +194,7 @@ public void testRepeatedlyProcessingTime() throws Exception { SimpleTriggerTester tester = TriggerTester.forTrigger( Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() + AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(15))), new GlobalWindows()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 65b5ee68fd7a..f2036ebe2301 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -499,7 +499,7 @@ public void testPaneInfoAllStates() throws Exception { public void testPaneInfoAllStatesAfterWatermark() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withTrigger(Repeatedly.forever(AfterFirst.of( + .withTrigger(Repeatedly.forever(AfterFirst.of( AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) @@ -605,7 +605,7 @@ public void noEmptyPanesFinalAlways() throws Exception { public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withTrigger(Repeatedly.forever(AfterFirst.of( + .withTrigger(Repeatedly.forever(AfterFirst.of( AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) @@ -658,7 +658,7 @@ public void testPaneInfoFinalAndOnTime() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) .withTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + Repeatedly.forever(AfterPane.elementCountAtLeast(2)) .orFinally(AfterWatermark.pastEndOfWindow())) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) @@ -1075,12 +1075,12 @@ public void testEmptyOnTimeFromOrFinally() throws Exception { ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), AfterEach.inOrder( Repeatedly - .forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + .forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(5))) .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(25)))), AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of(), Duration.millis(100)); @@ -1125,12 +1125,12 @@ public void testProcessingTime() throws Exception { ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), AfterEach.inOrder( Repeatedly - .forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + .forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(5))) .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(25)))), AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of(), Duration.millis(100));