diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java index e77e2a1203..5f29c8ffe3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java @@ -89,8 +89,8 @@ public void onFire(TriggerContext context) throws Exception { getRepeated(context).invokeOnFire(context); if (context.trigger().isFinished(REPEATED)) { - context.trigger().setFinished(false, REPEATED); - getRepeated(context).invokeClear(context); + // Reset tree will recursively clear the finished bits, and invoke clear. + context.forTrigger(getRepeated(context)).trigger().resetTree(); } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java index f445b52565..e14c15c4e2 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java @@ -125,4 +125,87 @@ public void testShouldFireAfterMerge() throws Exception { IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); assertTrue(tester.shouldFire(mergedWindow)); } + + @Test + public void testRepeatedlyAfterFirstElementCount() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyAfterFirstProcessingTime() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyElementCount() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(5)), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyProcessingTime() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + }