diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index d62bcc96e708..2415dabaedc3 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -438,8 +438,22 @@ private Collection processElement(WindowedValue value) throws Excepti for (BoundedWindow untypedWindow : value.getWindows()) { @SuppressWarnings("unchecked") W window = (W) untypedWindow; + + ReduceFn.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + if (triggerRunner.isClosed(directContext.state())) { + // This window has already been closed. + droppedDueToClosedWindow.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", + value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + continue; + } + W active = activeWindows.representative(window); - Preconditions.checkState(active != null, "Window %s should have been added", window); + Preconditions.checkState(active != null, "Window %s has no representative", window); windows.add(active); } @@ -450,24 +464,13 @@ private Collection processElement(WindowedValue value) throws Excepti triggerRunner.prefetchForValue(window, directContext.state()); } - // Process the element for each (representative) window it belongs to. + // Process the element for each (representative, not closed) window it belongs to. for (W window : windows) { ReduceFn.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); ReduceFn.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); - // Check to see if the triggerRunner thinks the window is closed. If so, drop that window. - if (triggerRunner.isClosed(directContext.state())) { - droppedDueToClosedWindow.addValue(1L); - WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", - value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - continue; - } - nonEmptyPanes.recordContent(renamedContext.state()); // Make sure we've scheduled the end-of-window or garbage collection timer for this window. diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index 10b886b6e4d7..b58e360bd47d 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -724,6 +724,44 @@ public void testMergingWithCloseBeforeGC() throws Exception { equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } + /** + * If a later event tries to reuse an earlier session window which has been closed, we + * should reject that element and not fail due to the window no longer having a representative. + */ + @Test + public void testMergingWithReusedWindow() throws Exception { + ReduceFnTester, IntervalWindow> tester = + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), + ClosingBehavior.FIRE_IF_NON_EMPTY); + + // One elements in one session window. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. + + // Close the trigger, but the gargbage collection timer is still pending. + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTrigger); + tester.advanceInputWatermark(new Instant(15)); + + // Another element in the same session window. + // Should be discarded with 'window closed'. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. + + // Now the garbage collection timer will fire, finding the trigger already closed. + tester.advanceInputWatermark(new Instant(100)); + + List>> output = tester.extractOutput(); + assertThat(output.size(), equalTo(1)); + assertThat(output.get(0), + isSingleWindowedValue(containsInAnyOrder(1), + 1, // timestamp + 1, // window start + 11)); // window end + assertThat( + output.get(0).getPane(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + } + /** * Tests that when data is assigned to multiple windows but some of those windows have * had their triggers finish, then the data is dropped and counted accurately.