From da166bf8b156a6a0bd1335b4f002f9f7f49f972a Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Thu, 31 Mar 2016 10:57:55 -0700 Subject: [PATCH] Drop elements in closed windows before mapping window Previously, the sequence was: 1. Map a window to a representative of its equivalence class according to merging. 2. Drop the element if that window was closed. But this crashes if the original window was already closed. The new sequence is reversed. This is safe, because it is not possible to map to a representative which is closed, as it is no longer a candidate for merges. --- .../dataflow/sdk/util/ReduceFnRunner.java | 29 +++++++------- .../dataflow/sdk/util/ReduceFnRunnerTest.java | 38 +++++++++++++++++++ 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 560d8ecd44..7fc7ccc34d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -436,8 +436,22 @@ private Collection processElement(WindowedValue value) throws Excepti for (BoundedWindow untypedWindow : value.getWindows()) { @SuppressWarnings("unchecked") W window = (W) untypedWindow; + + ReduceFn.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + if (triggerRunner.isClosed(directContext.state())) { + // This window has already been closed. + droppedDueToClosedWindow.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", + value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + continue; + } + W active = activeWindows.representative(window); - Preconditions.checkState(active != null, "Window %s should have been added", window); + Preconditions.checkState(active != null, "Window %s has no representative", window); windows.add(active); } @@ -448,24 +462,13 @@ private Collection processElement(WindowedValue value) throws Excepti triggerRunner.prefetchForValue(window, directContext.state()); } - // Process the element for each (representative) window it belongs to. + // Process the element for each (representative, not closed) window it belongs to. for (W window : windows) { ReduceFn.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); ReduceFn.ProcessValueContext renamedContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); - // Check to see if the triggerRunner thinks the window is closed. If so, drop that window. - if (triggerRunner.isClosed(directContext.state())) { - droppedDueToClosedWindow.addValue(1L); - WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", - value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - continue; - } - nonEmptyPanes.recordContent(renamedContext.state()); // Make sure we've scheduled the end-of-window or garbage collection timer for this window. diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index c60af85572..646accd252 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -723,6 +723,44 @@ public void testMergingWithCloseBeforeGC() throws Exception { equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); } + /** + * If a later event tries to reuse an earlier session window which has been closed, we + * should reject that element and not fail due to the window no longer having a representative. + */ + @Test + public void testMergingWithReusedWindow() throws Exception { + ReduceFnTester, IntervalWindow> tester = + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), + ClosingBehavior.FIRE_IF_NON_EMPTY); + + // One elements in one session window. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. + + // Close the trigger, but the gargbage collection timer is still pending. + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTrigger); + tester.advanceInputWatermark(new Instant(15)); + + // Another element in the same session window. + // Should be discarded with 'window closed'. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. + + // Now the garbage collection timer will fire, finding the trigger already closed. + tester.advanceInputWatermark(new Instant(100)); + + List>> output = tester.extractOutput(); + assertThat(output.size(), equalTo(1)); + assertThat(output.get(0), + isSingleWindowedValue(containsInAnyOrder(1), + 1, // timestamp + 1, // window start + 11)); // window end + assertThat( + output.get(0).getPane(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + } + /** * Tests that when data is assigned to multiple windows but some of those windows have * had their triggers finish, then the data is dropped and counted accurately.