Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,22 @@ private Collection<W> processElement(WindowedValue<InputT> value) throws Excepti
for (BoundedWindow untypedWindow : value.getWindows()) {
@SuppressWarnings("unchecked")
W window = (W) untypedWindow;

ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
if (triggerRunner.isClosed(directContext.state())) {
// This window has already been closed.
droppedDueToClosedWindow.addValue(1L);
WindowTracing.debug(
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
continue;
}

W active = activeWindows.representative(window);
Preconditions.checkState(active != null, "Window %s should have been added", window);
Preconditions.checkState(active != null, "Window %s has no representative", window);
windows.add(active);
}

Expand All @@ -450,24 +464,13 @@ private Collection<W> processElement(WindowedValue<InputT> value) throws Excepti
triggerRunner.prefetchForValue(window, directContext.state());
}

// Process the element for each (representative) window it belongs to.
// Process the element for each (representative, not closed) window it belongs to.
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);

// Check to see if the triggerRunner thinks the window is closed. If so, drop that window.
if (triggerRunner.isClosed(directContext.state())) {
droppedDueToClosedWindow.addValue(1L);
WindowTracing.debug(
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
continue;
}

nonEmptyPanes.recordContent(renamedContext.state());

// Make sure we've scheduled the end-of-window or garbage collection timer for this window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,44 @@ public void testMergingWithCloseBeforeGC() throws Exception {
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
* If a later event tries to reuse an earlier session window which has been closed, we
* should reject that element and not fail due to the window no longer having a representative.
*/
@Test
public void testMergingWithReusedWindow() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);

// One elements in one session window.
tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.

// Close the trigger, but the gargbage collection timer is still pending.
when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTrigger);
tester.advanceInputWatermark(new Instant(15));

// Another element in the same session window.
// Should be discarded with 'window closed'.
tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21.

// Now the garbage collection timer will fire, finding the trigger already closed.
tester.advanceInputWatermark(new Instant(100));

List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output.size(), equalTo(1));
assertThat(output.get(0),
isSingleWindowedValue(containsInAnyOrder(1),
1, // timestamp
1, // window start
11)); // window end
assertThat(
output.get(0).getPane(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
* Tests that when data is assigned to multiple windows but some of those windows have
* had their triggers finish, then the data is dropped and counted accurately.
Expand Down