From 2ad027f49adbcb80ffac87c3fffa81fd68c89e86 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 31 Mar 2016 15:03:59 -0700 Subject: [PATCH] Repeatedly#onFire should clear all finished bits Previously, Repeatedly#onFire only cleared the finished bits associated with the root of the sub-tree, as demonstrated by the new unit tests. This led to problems with AfterFirst#shouldFire, which checked to see if any of the sub-triggers have their finished bits set. Now, Repeatedly#onFire calls #resetTree, which clears all the finished bits in the entire sub-tree. --- .../sdk/transforms/windowing/Repeatedly.java | 4 +- .../transforms/windowing/RepeatedlyTest.java | 83 +++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java index 3416551ddccb..9be0259dea14 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Repeatedly.java @@ -90,8 +90,8 @@ public void onFire(TriggerContext context) throws Exception { getRepeated(context).invokeOnFire(context); if (context.trigger().isFinished(REPEATED)) { - context.trigger().setFinished(false, REPEATED); - getRepeated(context).invokeClear(context); + // Reset tree will recursively clear the finished bits, and invoke clear. + context.forTrigger(getRepeated(context)).trigger().resetTree(); } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java index ddfec1cdcc9c..99907b20ebdf 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/windowing/RepeatedlyTest.java @@ -126,4 +126,87 @@ public void testShouldFireAfterMerge() throws Exception { IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); assertTrue(tester.shouldFire(mergedWindow)); } + + @Test + public void testRepeatedlyAfterFirstElementCount() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyAfterFirstProcessingTime() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterFirst.of( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15)), + AfterPane.elementCountAtLeast(5))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyElementCount() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever(AfterPane.elementCountAtLeast(5)), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.injectElements(2, 3, 4, 5); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + + @Test + public void testRepeatedlyProcessingTime() throws Exception { + SimpleTriggerTester tester = + TriggerTester.forTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(15))), + new GlobalWindows()); + + GlobalWindow window = GlobalWindow.INSTANCE; + + tester.injectElements(1); + assertFalse(tester.shouldFire(window)); + + tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); + assertTrue(tester.shouldFire(window)); + tester.fireIfShouldFire(window); + assertFalse(tester.shouldFire(window)); + } + }