Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
} else {
nonLateElements.add(
WindowedValue.of(
element.getValue(), element.getTimestamp(), window, element.getPane()));
element.getValue(), element.getTimestamp(), window, element.getPaneInfo()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public Instant timestamp() {

@Override
public PaneInfo pane() {
return element.getPane();
return element.getPaneInfo();
}

@Override
Expand All @@ -390,7 +390,7 @@ public void output(OutputT output) {

@Override
public void outputWithTimestamp(OutputT value, Instant timestamp) {
outputWindowedValue(value, timestamp, element.getWindows(), element.getPane());
outputWindowedValue(value, timestamp, element.getWindows(), element.getPaneInfo());
}

@Override
Expand All @@ -413,7 +413,7 @@ public <T> void output(TupleTag<T> tag, T value) {

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPaneInfo());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public <T> T sideInput(PCollectionView<T> view) {

@Override
public PaneInfo pane() {
return elem.getPane();
return elem.getPaneInfo();
}

@Override
Expand Down Expand Up @@ -436,7 +436,7 @@ public <T> void output(TupleTag<T> tag, T output) {
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
checkTimestamp(elem.getTimestamp(), timestamp);
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPane());
outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPaneInfo());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public PipelineOptions pipelineOptions() {

@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getPane();
return elementAndRestriction.getKey().getPaneInfo();
}

@Override
Expand Down Expand Up @@ -490,7 +490,7 @@ public PipelineOptions pipelineOptions() {

@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getPane();
return elementAndRestriction.getKey().getPaneInfo();
}

@Override
Expand Down Expand Up @@ -544,7 +544,7 @@ public PipelineOptions pipelineOptions() {

@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return elementAndRestriction.getKey().getPane();
return elementAndRestriction.getKey().getPaneInfo();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,8 @@ public void testWatermarkHoldAndLateData() throws Exception {
equalTo(new Instant(1)),
equalTo((BoundedWindow) expectedWindow))));
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));
output.get(0).getPaneInfo(),
equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)));

// There is no end-of-window hold, but the timer set by the trigger holds the watermark
assertThat(tester.getWatermarkHold(), nullValue());
Expand Down Expand Up @@ -805,7 +806,8 @@ public void testWatermarkHoldAndLateData() throws Exception {
0, // window start
10))); // window end
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
output.get(0).getPaneInfo(),
equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));

// Since the element hold is cleared, there is no hold remaining
assertThat(tester.getWatermarkHold(), nullValue());
Expand Down Expand Up @@ -846,7 +848,8 @@ public void testWatermarkHoldAndLateData() throws Exception {
0, // window start
10))); // window end
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));
output.get(0).getPaneInfo(),
equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)));

tester.setAutoAdvanceOutputWatermark(true);

Expand Down Expand Up @@ -879,7 +882,7 @@ public void testWatermarkHoldAndLateData() throws Exception {
0, // window start
10))); // window end
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1)));
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1)));
assertEquals(new Instant(50), tester.getOutputWatermark());
assertEquals(null, tester.getWatermarkHold());

Expand Down Expand Up @@ -1486,7 +1489,8 @@ public void testMergeBeforeFinalizing() throws Exception {
1, // window start
20)); // window end
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
output.get(0).getPaneInfo(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
Expand Down Expand Up @@ -1527,7 +1531,8 @@ public void testMergingWithCloseBeforeGC() throws Exception {
1, // window start
20)); // window end
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
output.get(0).getPaneInfo(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/** Ensure a closed trigger has its state recorded in the merge result window. */
Expand Down Expand Up @@ -1614,7 +1619,8 @@ public void testMergingWithReusedWindow() throws Exception {
equalTo((BoundedWindow) mergedWindow)));

assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
output.get(0).getPaneInfo(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
Expand Down Expand Up @@ -1661,7 +1667,7 @@ public void testMergingWithClosedRepresentative() throws Exception {
1, // window start
18)); // window end
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
}

/**
Expand Down Expand Up @@ -1702,7 +1708,7 @@ public void testMergingWithClosedDoesNotPoison() throws Exception {
2, // window start
12)); // window end
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0)));
assertThat(
output.get(1),
isSingleWindowedValue(
Expand All @@ -1711,7 +1717,8 @@ public void testMergingWithClosedDoesNotPoison() throws Exception {
1, // window start
13)); // window end
assertThat(
output.get(1).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
output.get(1).getPaneInfo(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
Expand Down Expand Up @@ -1811,7 +1818,7 @@ public void testIdempotentEmptyPanesDiscarding() throws Exception {
// The late pane has the correct indices.
assertThat(output.get(1).getValue(), contains(3));
assertThat(
output.get(1).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
output.get(1).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));

assertTrue(tester.isMarkedFinished(firstWindow));
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
Expand Down Expand Up @@ -1850,7 +1857,8 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception {
assertThat(output.size(), equalTo(1));
assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10));
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
output.get(0).getPaneInfo(),
equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));

// Fire another timer with no data; the empty pane should not be output even though the
// trigger is ready to fire
Expand All @@ -1868,7 +1876,7 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception {
// The late pane has the correct indices.
assertThat(output.get(0).getValue(), containsInAnyOrder(1, 2, 3));
assertThat(
output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
output.get(0).getPaneInfo(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));

assertTrue(tester.isMarkedFinished(firstWindow));
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
Expand Down Expand Up @@ -2193,17 +2201,17 @@ public void fireNonEmptyOnDrainInGlobalWindow() throws Exception {
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertEquals(n / 3, output.size());
for (int i = 0; i < output.size(); i++) {
assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
assertEquals(i, output.get(i).getPane().getIndex());
assertEquals(Timing.EARLY, output.get(i).getPaneInfo().getTiming());
assertEquals(i, output.get(i).getPaneInfo().getIndex());
assertEquals(3, Iterables.size(output.get(i).getValue()));
}

tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

output = tester.extractOutput();
assertEquals(1, output.size());
assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
assertEquals(n / 3, output.get(0).getPane().getIndex());
assertEquals(Timing.ON_TIME, output.get(0).getPaneInfo().getTiming());
assertEquals(n / 3, output.get(0).getPaneInfo().getIndex());
assertEquals(n - ((n / 3) * 3), Iterables.size(output.get(0).getValue()));
}

Expand Down Expand Up @@ -2231,17 +2239,17 @@ public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertEquals((n + 3) / 4, output.size());
for (int i = 0; i < output.size(); i++) {
assertEquals(Timing.EARLY, output.get(i).getPane().getTiming());
assertEquals(i, output.get(i).getPane().getIndex());
assertEquals(Timing.EARLY, output.get(i).getPaneInfo().getTiming());
assertEquals(i, output.get(i).getPaneInfo().getIndex());
assertEquals(4, Iterables.size(output.get(i).getValue()));
}

tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

output = tester.extractOutput();
assertEquals(1, output.size());
assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming());
assertEquals((n + 3) / 4, output.get(0).getPane().getIndex());
assertEquals(Timing.ON_TIME, output.get(0).getPaneInfo().getTiming());
assertEquals((n + 3) / 4, output.get(0).getPaneInfo().getIndex());
assertEquals(0, Iterables.size(output.get(0).getValue()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ public void describeTo(Description description) {

@Override
protected boolean matchesSafely(WindowedValue<? extends T> item) {
return Objects.equals(item.getPane(), paneInfo);
return Objects.equals(item.getPaneInfo(), paneInfo);
}

@Override
protected void describeMismatchSafely(
WindowedValue<? extends T> item, Description mismatchDescription) {
mismatchDescription.appendValue(item.getPane());
mismatchDescription.appendValue(item.getPaneInfo());
}
};
}
Expand Down Expand Up @@ -212,7 +212,7 @@ protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) {
return valueMatcher.matches(windowedValue.getValue())
&& timestampMatcher.matches(windowedValue.getTimestamp())
&& windowsMatcher.matches(windowedValue.getWindows())
&& paneInfoMatcher.matches(windowedValue.getPane());
&& paneInfoMatcher.matches(windowedValue.getPaneInfo());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private void updatePCollectionViewWindowValues(
// the value had never been set, so we set it and are done.
return;
}
PaneInfo newPane = windowValues.iterator().next().getPane();
PaneInfo newPane = windowValues.iterator().next().getPaneInfo();

Iterable<? extends WindowedValue<?>> existingValues;
long existingPane;
Expand All @@ -168,7 +168,7 @@ private void updatePCollectionViewWindowValues(
existingPane =
Iterables.isEmpty(existingValues)
? -1L
: existingValues.iterator().next().getPane().getIndex();
: existingValues.iterator().next().getPaneInfo().getIndex();
} while (newPane.getIndex() > existingPane
&& !contents.compareAndSet(existingValues, windowValues));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void processElement(WindowedValue<InputT> compressedElement) throws Excep
Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
outputBundle.add(
WindowedValue.of(
element.getValue(), element.getTimestamp(), windows, element.getPane()));
element.getValue(), element.getTimestamp(), windows, element.getPaneInfo()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,24 @@ public void multipleWindowsWindowFnSucceeds() throws Exception {
valueInIntervalWindow.getValue(),
valueInIntervalWindow.getTimestamp(),
ImmutableSet.of(wMinus1, wMinusSlide),
valueInIntervalWindow.getPane()),
valueInIntervalWindow.getPaneInfo()),

// Value in three windows mapped to three windowed values in the same multiple windows
isWindowedValue(
valueInGlobalAndTwoIntervalWindows.getValue(),
valueInGlobalAndTwoIntervalWindows.getTimestamp(),
ImmutableSet.of(w1, w2),
valueInGlobalAndTwoIntervalWindows.getPane()),
valueInGlobalAndTwoIntervalWindows.getPaneInfo()),
isWindowedValue(
valueInGlobalAndTwoIntervalWindows.getValue(),
valueInGlobalAndTwoIntervalWindows.getTimestamp(),
ImmutableSet.of(w1, w2),
valueInGlobalAndTwoIntervalWindows.getPane()),
valueInGlobalAndTwoIntervalWindows.getPaneInfo()),
isWindowedValue(
valueInGlobalAndTwoIntervalWindows.getValue(),
valueInGlobalAndTwoIntervalWindows.getTimestamp(),
ImmutableSet.of(w1, w2),
valueInGlobalAndTwoIntervalWindows.getPane())));
valueInGlobalAndTwoIntervalWindows.getPaneInfo())));
}

@Test
Expand All @@ -219,14 +219,14 @@ public void referencesEarlierWindowsSucceeds() throws Exception {
new IntervalWindow(
valueInGlobalWindow.getTimestamp(),
valueInGlobalWindow.getTimestamp().plus(Duration.millis(1L))),
valueInGlobalWindow.getPane()),
valueInGlobalWindow.getPaneInfo()),

// Value in interval window mapped to the same window
isWindowedValue(
valueInIntervalWindow.getValue(),
valueInIntervalWindow.getTimestamp(),
valueInIntervalWindow.getWindows(),
valueInIntervalWindow.getPane()),
valueInIntervalWindow.getPaneInfo()),

// Value in global window and two interval windows exploded and mapped in both ways
isSingleWindowedValue(
Expand All @@ -235,17 +235,17 @@ public void referencesEarlierWindowsSucceeds() throws Exception {
new IntervalWindow(
valueInGlobalAndTwoIntervalWindows.getTimestamp(),
valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(Duration.millis(1L))),
valueInGlobalAndTwoIntervalWindows.getPane()),
valueInGlobalAndTwoIntervalWindows.getPaneInfo()),
isSingleWindowedValue(
valueInGlobalAndTwoIntervalWindows.getValue(),
valueInGlobalAndTwoIntervalWindows.getTimestamp(),
intervalWindow1,
valueInGlobalAndTwoIntervalWindows.getPane()),
valueInGlobalAndTwoIntervalWindows.getPaneInfo()),
isSingleWindowedValue(
valueInGlobalAndTwoIntervalWindows.getValue(),
valueInGlobalAndTwoIntervalWindows.getTimestamp(),
intervalWindow2,
valueInGlobalAndTwoIntervalWindows.getPane())));
valueInGlobalAndTwoIntervalWindows.getPaneInfo())));
}

private CommittedBundle<Long> createInputBundle() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ public void collect(WindowedValue<ValueWithRecordId<OutputT>> element) {
OutputT originalValue = element.getValue().getValue();
WindowedValue<OutputT> output =
WindowedValue.of(
originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
originalValue, element.getTimestamp(), element.getWindows(), element.getPaneInfo());
ctx.collect(output);
}

Expand All @@ -1414,7 +1414,7 @@ public void collectWithTimestamp(
OutputT originalValue = element.getValue().getValue();
WindowedValue<OutputT> output =
WindowedValue.of(
originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
originalValue, element.getTimestamp(), element.getWindows(), element.getPaneInfo());
ctx.collectWithTimestamp(output, timestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void flatMap(WindowedValue<T> input, Collector<WindowedValue<T>> collecto
Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input));
for (W window : windows) {
collector.collect(
WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane()));
WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPaneInfo()));
}
}
}
Loading
Loading